# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2023 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import io import os import platform import sys import locale import logging import uuid import time import wsgiref.util import tempfile import psutil from itertools import chain import msgpack import configparser from pyramid.config import Configurator from pyramid.wsgi import wsgiapp from pyramid.response import Response from vcsserver.base import BytesEnvelope, BinaryEnvelope from vcsserver.lib.rc_json import json from vcsserver.config.settings_maker import SettingsMaker from vcsserver.str_utils import safe_int from vcsserver.lib.statsd_client import StatsdClient from vcsserver.tweens.request_wrapper import get_headers_call_context import vcsserver from vcsserver import remote_wsgi, scm_app, settings, hgpatches from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub from vcsserver.echo_stub.echo_app import EchoApp from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected from vcsserver.lib.exc_tracking import store_exception, format_exc from vcsserver.server import VcsServer strict_vcs = True git_import_err = None try: from vcsserver.remote.git_remote import GitFactory, GitRemote except ImportError as e: GitFactory = None GitRemote = None git_import_err = e if strict_vcs: raise hg_import_err = None try: from vcsserver.remote.hg_remote import MercurialFactory, HgRemote except ImportError as e: MercurialFactory = None HgRemote = None hg_import_err = e if strict_vcs: raise svn_import_err = None try: from vcsserver.remote.svn_remote import SubversionFactory, SvnRemote except ImportError as e: SubversionFactory = None SvnRemote = None svn_import_err = e if strict_vcs: raise log = logging.getLogger(__name__) # due to Mercurial/glibc2.27 problems we need to detect if locale settings are # causing problems and "fix" it in case they do and fallback to LC_ALL = C try: locale.setlocale(locale.LC_ALL, '') except locale.Error as e: log.error( 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e) os.environ['LC_ALL'] = 'C' def _is_request_chunked(environ): stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked' return stream def log_max_fd(): try: maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1] log.info('Max file descriptors value: %s', maxfd) except Exception: pass class VCS(object): def __init__(self, locale_conf=None, cache_config=None): self.locale = locale_conf self.cache_config = cache_config self._configure_locale() log_max_fd() if GitFactory and GitRemote: git_factory = GitFactory() self._git_remote = GitRemote(git_factory) else: log.error("Git client import failed: %s", git_import_err) if MercurialFactory and HgRemote: hg_factory = MercurialFactory() self._hg_remote = HgRemote(hg_factory) else: log.error("Mercurial client import failed: %s", hg_import_err) if SubversionFactory and SvnRemote: svn_factory = SubversionFactory() # hg factory is used for svn url validation hg_factory = MercurialFactory() self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory) else: log.error("Subversion client import failed: %s", svn_import_err) self._vcsserver = VcsServer() def _configure_locale(self): if self.locale: log.info('Settings locale: `LC_ALL` to %s', self.locale) else: log.info('Configuring locale subsystem based on environment variables') try: # If self.locale is the empty string, then the locale # module will use the environment variables. See the # documentation of the package `locale`. locale.setlocale(locale.LC_ALL, self.locale) language_code, encoding = locale.getlocale() log.info( 'Locale set to language code "%s" with encoding "%s".', language_code, encoding) except locale.Error: log.exception('Cannot set locale, not configuring the locale system') class WsgiProxy(object): def __init__(self, wsgi): self.wsgi = wsgi def __call__(self, environ, start_response): input_data = environ['wsgi.input'].read() input_data = msgpack.unpackb(input_data) error = None try: data, status, headers = self.wsgi.handle( input_data['environment'], input_data['input_data'], *input_data['args'], **input_data['kwargs']) except Exception as e: data, status, headers = [], None, None error = { 'message': str(e), '_vcs_kind': getattr(e, '_vcs_kind', None) } start_response(200, {}) return self._iterator(error, status, headers, data) def _iterator(self, error, status, headers, data): initial_data = [ error, status, headers, ] for d in chain(initial_data, data): yield msgpack.packb(d) def not_found(request): return {'status': '404 NOT FOUND'} class VCSViewPredicate(object): def __init__(self, val, config): self.remotes = val def text(self): return f'vcs view method = {list(self.remotes.keys())}' phash = text def __call__(self, context, request): """ View predicate that returns true if given backend is supported by defined remotes. """ backend = request.matchdict.get('backend') return backend in self.remotes class HTTPApplication(object): ALLOWED_EXCEPTIONS = ('KeyError', 'URLError') remote_wsgi = remote_wsgi _use_echo_app = False def __init__(self, settings=None, global_config=None): self.config = Configurator(settings=settings) # Init our statsd at very start self.config.registry.statsd = StatsdClient.statsd self.config.registry.vcs_call_context = {} self.global_config = global_config self.config.include('vcsserver.lib.rc_cache') self.config.include('vcsserver.lib.rc_cache.archive_cache') settings_locale = settings.get('locale', '') or 'en_US.UTF-8' vcs = VCS(locale_conf=settings_locale, cache_config=settings) self._remotes = { 'hg': vcs._hg_remote, 'git': vcs._git_remote, 'svn': vcs._svn_remote, 'server': vcs._vcsserver, } if settings.get('dev.use_echo_app', 'false').lower() == 'true': self._use_echo_app = True log.warning("Using EchoApp for VCS operations.") self.remote_wsgi = remote_wsgi_stub self._configure_settings(global_config, settings) self._configure() def _configure_settings(self, global_config, app_settings): """ Configure the settings module. """ settings_merged = global_config.copy() settings_merged.update(app_settings) git_path = app_settings.get('git_path', None) if git_path: settings.GIT_EXECUTABLE = git_path binary_dir = app_settings.get('core.binary_dir', None) if binary_dir: settings.BINARY_DIR = binary_dir # Store the settings to make them available to other modules. vcsserver.PYRAMID_SETTINGS = settings_merged vcsserver.CONFIG = settings_merged def _configure(self): self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory) self.config.add_route('service', '/_service') self.config.add_route('status', '/status') self.config.add_route('hg_proxy', '/proxy/hg') self.config.add_route('git_proxy', '/proxy/git') # rpc methods self.config.add_route('vcs', '/{backend}') # streaming rpc remote methods self.config.add_route('vcs_stream', '/{backend}/stream') # vcs operations clone/push as streaming self.config.add_route('stream_git', '/stream/git/*repo_name') self.config.add_route('stream_hg', '/stream/hg/*repo_name') self.config.add_view(self.status_view, route_name='status', renderer='json') self.config.add_view(self.service_view, route_name='service', renderer='msgpack') self.config.add_view(self.hg_proxy(), route_name='hg_proxy') self.config.add_view(self.git_proxy(), route_name='git_proxy') self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack', vcs_view=self._remotes) self.config.add_view(self.vcs_stream_view, route_name='vcs_stream', vcs_view=self._remotes) self.config.add_view(self.hg_stream(), route_name='stream_hg') self.config.add_view(self.git_stream(), route_name='stream_git') self.config.add_view_predicate('vcs_view', VCSViewPredicate) self.config.add_notfound_view(not_found, renderer='json') self.config.add_view(self.handle_vcs_exception, context=Exception) self.config.add_tween( 'vcsserver.tweens.request_wrapper.RequestWrapperTween', ) self.config.add_request_method( 'vcsserver.lib.request_counter.get_request_counter', 'request_count') def wsgi_app(self): return self.config.make_wsgi_app() def _vcs_view_params(self, request): remote = self._remotes[request.matchdict['backend']] payload = msgpack.unpackb(request.body, use_list=True) method = payload.get('method') params = payload['params'] wire = params.get('wire') args = params.get('args') kwargs = params.get('kwargs') context_uid = None request.registry.vcs_call_context = { 'method': method, 'repo_name': payload.get('_repo_name'), } if wire: try: wire['context'] = context_uid = uuid.UUID(wire['context']) except KeyError: pass args.insert(0, wire) repo_state_uid = wire.get('repo_state_uid') if wire else None # NOTE(marcink): trading complexity for slight performance if log.isEnabledFor(logging.DEBUG): # also we SKIP printing out any of those methods args since they maybe excessive just_args_methods = { 'commitctx': ('content', 'removed', 'updated'), 'commit': ('content', 'removed', 'updated') } if method in just_args_methods: skip_args = just_args_methods[method] call_args = '' call_kwargs = {} for k in kwargs: if k in skip_args: # replace our skip key with dummy call_kwargs[k] = f'RemovedParam({k})' else: call_kwargs[k] = kwargs[k] else: call_args = args[1:] call_kwargs = kwargs log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s', method, call_args, call_kwargs, context_uid, repo_state_uid) statsd = request.registry.statsd if statsd: statsd.incr( 'vcsserver_method_total', tags=[ f"method:{method}", ]) return payload, remote, method, args, kwargs def vcs_view(self, request): payload, remote, method, args, kwargs = self._vcs_view_params(request) payload_id = payload.get('id') try: resp = getattr(remote, method)(*args, **kwargs) except Exception as e: exc_info = list(sys.exc_info()) exc_type, exc_value, exc_traceback = exc_info org_exc = getattr(e, '_org_exc', None) org_exc_name = None org_exc_tb = '' if org_exc: org_exc_name = org_exc.__class__.__name__ org_exc_tb = getattr(e, '_org_exc_tb', '') # replace our "faked" exception with our org exc_info[0] = org_exc.__class__ exc_info[1] = org_exc should_store_exc = True if org_exc: def get_exc_fqn(_exc_obj): module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN') return module_name + '.' + org_exc_name exc_fqn = get_exc_fqn(org_exc) if exc_fqn in ['mercurial.error.RepoLookupError', 'vcsserver.exceptions.RefNotFoundException']: should_store_exc = False if should_store_exc: store_exception(id(exc_info), exc_info, request_path=request.path) tb_info = format_exc(exc_info) type_ = e.__class__.__name__ if type_ not in self.ALLOWED_EXCEPTIONS: type_ = None resp = { 'id': payload_id, 'error': { 'message': str(e), 'traceback': tb_info, 'org_exc': org_exc_name, 'org_exc_tb': org_exc_tb, 'type': type_ } } try: resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None) except AttributeError: pass else: resp = { 'id': payload_id, 'result': resp } log.debug('Serving data for method %s', method) return resp def vcs_stream_view(self, request): payload, remote, method, args, kwargs = self._vcs_view_params(request) # this method has a stream: marker we remove it here method = method.split('stream:')[-1] chunk_size = safe_int(payload.get('chunk_size')) or 4096 resp = getattr(remote, method)(*args, **kwargs) def get_chunked_data(method_resp): stream = io.BytesIO(method_resp) while 1: chunk = stream.read(chunk_size) if not chunk: break yield chunk response = Response(app_iter=get_chunked_data(resp)) response.content_type = 'application/octet-stream' return response def status_view(self, request): import vcsserver _platform_id = platform.uname()[1] or 'instance' return { "status": "OK", "vcsserver_version": vcsserver.__version__, "platform": _platform_id, "pid": os.getpid(), } def service_view(self, request): import vcsserver payload = msgpack.unpackb(request.body, use_list=True) server_config, app_config = {}, {} try: path = self.global_config['__file__'] config = configparser.RawConfigParser() config.read(path) if config.has_section('server:main'): server_config = dict(config.items('server:main')) if config.has_section('app:main'): app_config = dict(config.items('app:main')) except Exception: log.exception('Failed to read .ini file for display') environ = list(os.environ.items()) resp = { 'id': payload.get('id'), 'result': dict( version=vcsserver.__version__, config=server_config, app_config=app_config, environ=environ, payload=payload, ) } return resp def _msgpack_renderer_factory(self, info): def _render(value, system): bin_type = False res = value.get('result') if isinstance(res, BytesEnvelope): log.debug('Result is wrapped in BytesEnvelope type') bin_type = True elif isinstance(res, BinaryEnvelope): log.debug('Result is wrapped in BinaryEnvelope type') value['result'] = res.val bin_type = True request = system.get('request') if request is not None: response = request.response ct = response.content_type if ct == response.default_content_type: response.content_type = 'application/x-msgpack' if bin_type: response.content_type = 'application/x-msgpack-bin' return msgpack.packb(value, use_bin_type=bin_type) return _render def set_env_from_config(self, environ, config): dict_conf = {} try: for elem in config: if elem[0] == 'rhodecode': dict_conf = json.loads(elem[2]) break except Exception: log.exception('Failed to fetch SCM CONFIG') return username = dict_conf.get('username') if username: environ['REMOTE_USER'] = username # mercurial specific, some extension api rely on this environ['HGUSER'] = username ip = dict_conf.get('ip') if ip: environ['REMOTE_HOST'] = ip if _is_request_chunked(environ): # set the compatibility flag for webob environ['wsgi.input_terminated'] = True def hg_proxy(self): @wsgiapp def _hg_proxy(environ, start_response): app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi()) return app(environ, start_response) return _hg_proxy def git_proxy(self): @wsgiapp def _git_proxy(environ, start_response): app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi()) return app(environ, start_response) return _git_proxy def hg_stream(self): if self._use_echo_app: @wsgiapp def _hg_stream(environ, start_response): app = EchoApp('fake_path', 'fake_name', None) return app(environ, start_response) return _hg_stream else: @wsgiapp def _hg_stream(environ, start_response): log.debug('http-app: handling hg stream') call_context = get_headers_call_context(environ) repo_path = call_context['repo_path'] repo_name = call_context['repo_name'] config = call_context['repo_config'] app = scm_app.create_hg_wsgi_app( repo_path, repo_name, config) # Consistent path information for hgweb environ['PATH_INFO'] = call_context['path_info'] environ['REPO_NAME'] = repo_name self.set_env_from_config(environ, config) log.debug('http-app: starting app handler ' 'with %s and process request', app) return app(environ, ResponseFilter(start_response)) return _hg_stream def git_stream(self): if self._use_echo_app: @wsgiapp def _git_stream(environ, start_response): app = EchoApp('fake_path', 'fake_name', None) return app(environ, start_response) return _git_stream else: @wsgiapp def _git_stream(environ, start_response): log.debug('http-app: handling git stream') call_context = get_headers_call_context(environ) repo_path = call_context['repo_path'] repo_name = call_context['repo_name'] config = call_context['repo_config'] environ['PATH_INFO'] = call_context['path_info'] self.set_env_from_config(environ, config) content_type = environ.get('CONTENT_TYPE', '') path = environ['PATH_INFO'] is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type log.debug( 'LFS: Detecting if request `%s` is LFS server path based ' 'on content type:`%s`, is_lfs:%s', path, content_type, is_lfs_request) if not is_lfs_request: # fallback detection by path if GIT_LFS_PROTO_PAT.match(path): is_lfs_request = True log.debug( 'LFS: fallback detection by path of: `%s`, is_lfs:%s', path, is_lfs_request) if is_lfs_request: app = scm_app.create_git_lfs_wsgi_app( repo_path, repo_name, config) else: app = scm_app.create_git_wsgi_app( repo_path, repo_name, config) log.debug('http-app: starting app handler ' 'with %s and process request', app) return app(environ, start_response) return _git_stream def handle_vcs_exception(self, exception, request): _vcs_kind = getattr(exception, '_vcs_kind', '') if _vcs_kind == 'repo_locked': headers_call_context = get_headers_call_context(request.environ) status_code = safe_int(headers_call_context['locked_status_code']) return HTTPRepoLocked( title=str(exception), status_code=status_code, headers=[('X-Rc-Locked', '1')]) elif _vcs_kind == 'repo_branch_protected': # Get custom repo-branch-protected status code if present. return HTTPRepoBranchProtected( title=str(exception), headers=[('X-Rc-Branch-Protection', '1')]) exc_info = request.exc_info store_exception(id(exc_info), exc_info) traceback_info = 'unavailable' if request.exc_info: traceback_info = format_exc(request.exc_info) log.error( 'error occurred handling this request for path: %s, \n%s', request.path, traceback_info) statsd = request.registry.statsd if statsd: exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}" statsd.incr('vcsserver_exception_total', tags=[f"type:{exc_type}"]) raise exception class ResponseFilter(object): def __init__(self, start_response): self._start_response = start_response def __call__(self, status, response_headers, exc_info=None): headers = tuple( (h, v) for h, v in response_headers if not wsgiref.util.is_hop_by_hop(h)) return self._start_response(status, headers, exc_info) def sanitize_settings_and_apply_defaults(global_config, settings): _global_settings_maker = SettingsMaker(global_config) settings_maker = SettingsMaker(settings) settings_maker.make_setting('logging.autoconfigure', False, parser='bool') logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini') settings_maker.enable_logging(logging_conf) # Default includes, possible to change as a user pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline') log.debug("Using the following pyramid.includes: %s", pyramid_includes) settings_maker.make_setting('__file__', global_config.get('__file__')) settings_maker.make_setting('pyramid.default_locale_name', 'en') settings_maker.make_setting('locale', 'en_US.UTF-8') settings_maker.make_setting('core.binary_dir', '') temp_store = tempfile.gettempdir() default_cache_dir = os.path.join(temp_store, 'rc_cache') # save default, cache dir, and use it for all backends later. default_cache_dir = settings_maker.make_setting( 'cache_dir', default=default_cache_dir, default_when_empty=True, parser='dir:ensured') # exception store cache settings_maker.make_setting( 'exception_tracker.store_path', default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True, parser='dir:ensured' ) # repo_object cache defaults settings_maker.make_setting( 'rc_cache.repo_object.backend', default='dogpile.cache.rc.file_namespace', parser='string') settings_maker.make_setting( 'rc_cache.repo_object.expiration_time', default=30 * 24 * 60 * 60, # 30days parser='int') settings_maker.make_setting( 'rc_cache.repo_object.arguments.filename', default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'), parser='string') # statsd settings_maker.make_setting('statsd.enabled', False, parser='bool') settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string') settings_maker.make_setting('statsd.statsd_port', 9125, parser='int') settings_maker.make_setting('statsd.statsd_prefix', '') settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool') settings_maker.env_expand() def main(global_config, **settings): start_time = time.time() log.info('Pyramid app config starting') if MercurialFactory: hgpatches.patch_largefiles_capabilities() hgpatches.patch_subrepo_type_mapping() # Fill in and sanitize the defaults & do ENV expansion sanitize_settings_and_apply_defaults(global_config, settings) # init and bootstrap StatsdClient StatsdClient.setup(settings) pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app() total_time = time.time() - start_time log.info('Pyramid app created and configured in %.2fs', total_time) return pyramid_app