# RhodeCode VCSServer provides access to different vcs backends via network.
# Copyright (C) 2014-2020 RhodeCode GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA

import io
import os
import sys
import base64
import locale
import logging
import uuid
import time
import wsgiref.util
import traceback
import tempfile
import psutil

from itertools import chain

import msgpack
import configparser

from pyramid.config import Configurator
from pyramid.wsgi import wsgiapp
from pyramid.response import Response

from vcsserver.base import BinaryEnvelope
from vcsserver.lib.rc_json import json
from vcsserver.config.settings_maker import SettingsMaker
from vcsserver.str_utils import safe_int, safe_bytes, safe_str
from vcsserver.lib.statsd_client import StatsdClient

log = logging.getLogger(__name__)

# due to Mercurial/glibc2.27 problems we need to detect if locale settings are
# causing problems and "fix" it in case they do and fallback to LC_ALL = C

try:
    locale.setlocale(locale.LC_ALL, '')
except locale.Error as e:
    log.error(
        'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
    os.environ['LC_ALL'] = 'C'


import vcsserver
from vcsserver import remote_wsgi, scm_app, settings, hgpatches
from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
from vcsserver.echo_stub.echo_app import EchoApp
from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
from vcsserver.lib.exc_tracking import store_exception
from vcsserver.server import VcsServer

strict_vcs = True

git_import_err = None
try:
    from vcsserver.remote.git import GitFactory, GitRemote
except ImportError as e:
    GitFactory = None
    GitRemote = None
    git_import_err = e
    if strict_vcs:
        raise


hg_import_err = None
try:
    from vcsserver.remote.hg import MercurialFactory, HgRemote
except ImportError as e:
    MercurialFactory = None
    HgRemote = None
    hg_import_err = e
    if strict_vcs:
        raise


svn_import_err = None
try:
    from vcsserver.remote.svn import SubversionFactory, SvnRemote
except ImportError as e:
    SubversionFactory = None
    SvnRemote = None
    svn_import_err = e
    if strict_vcs:
        raise


def _is_request_chunked(environ):
    stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
    return stream


def log_max_fd():
    try:
        maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
        log.info('Max file descriptors value: %s', maxfd)
    except Exception:
        pass


class VCS(object):
    def __init__(self, locale_conf=None, cache_config=None):
        self.locale = locale_conf
        self.cache_config = cache_config
        self._configure_locale()

        log_max_fd()

        if GitFactory and GitRemote:
            git_factory = GitFactory()
            self._git_remote = GitRemote(git_factory)
        else:
            log.error("Git client import failed: %s", git_import_err)

        if MercurialFactory and HgRemote:
            hg_factory = MercurialFactory()
            self._hg_remote = HgRemote(hg_factory)
        else:
            log.error("Mercurial client import failed: %s", hg_import_err)

        if SubversionFactory and SvnRemote:
            svn_factory = SubversionFactory()

            # hg factory is used for svn url validation
            hg_factory = MercurialFactory()
            self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
        else:
            log.error("Subversion client import failed: %s", svn_import_err)

        self._vcsserver = VcsServer()

    def _configure_locale(self):
        if self.locale:
            log.info('Settings locale: `LC_ALL` to %s', self.locale)
        else:
            log.info('Configuring locale subsystem based on environment variables')
        try:
            # If self.locale is the empty string, then the locale
            # module will use the environment variables. See the
            # documentation of the package `locale`.
            locale.setlocale(locale.LC_ALL, self.locale)

            language_code, encoding = locale.getlocale()
            log.info(
                'Locale set to language code "%s" with encoding "%s".',
                language_code, encoding)
        except locale.Error:
            log.exception('Cannot set locale, not configuring the locale system')


class WsgiProxy(object):
    def __init__(self, wsgi):
        self.wsgi = wsgi

    def __call__(self, environ, start_response):
        input_data = environ['wsgi.input'].read()
        input_data = msgpack.unpackb(input_data)

        error = None
        try:
            data, status, headers = self.wsgi.handle(
                input_data['environment'], input_data['input_data'],
                *input_data['args'], **input_data['kwargs'])
        except Exception as e:
            data, status, headers = [], None, None
            error = {
                'message': str(e),
                '_vcs_kind': getattr(e, '_vcs_kind', None)
            }

        start_response(200, {})
        return self._iterator(error, status, headers, data)

    def _iterator(self, error, status, headers, data):
        initial_data = [
            error,
            status,
            headers,
        ]

        for d in chain(initial_data, data):
            yield msgpack.packb(d)


def not_found(request):
    return {'status': '404 NOT FOUND'}


class VCSViewPredicate(object):
    def __init__(self, val, config):
        self.remotes = val

    def text(self):
        return f'vcs view method = {list(self.remotes.keys())}'

    phash = text

    def __call__(self, context, request):
        """
        View predicate that returns true if given backend is supported by
        defined remotes.
        """
        backend = request.matchdict.get('backend')
        return backend in self.remotes


class HTTPApplication(object):
    ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')

    remote_wsgi = remote_wsgi
    _use_echo_app = False

    def __init__(self, settings=None, global_config=None):

        self.config = Configurator(settings=settings)
        # Init our statsd at very start
        self.config.registry.statsd = StatsdClient.statsd
        self.config.registry.vcs_call_context = {}

        self.global_config = global_config
        self.config.include('vcsserver.lib.rc_cache')

        settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
        vcs = VCS(locale_conf=settings_locale, cache_config=settings)
        self._remotes = {
            'hg': vcs._hg_remote,
            'git': vcs._git_remote,
            'svn': vcs._svn_remote,
            'server': vcs._vcsserver,
        }
        if settings.get('dev.use_echo_app', 'false').lower() == 'true':
            self._use_echo_app = True
            log.warning("Using EchoApp for VCS operations.")
            self.remote_wsgi = remote_wsgi_stub

        self._configure_settings(global_config, settings)

        self._configure()

    def _configure_settings(self, global_config, app_settings):
        """
        Configure the settings module.
        """
        settings_merged = global_config.copy()
        settings_merged.update(app_settings)

        git_path = app_settings.get('git_path', None)
        if git_path:
            settings.GIT_EXECUTABLE = git_path
        binary_dir = app_settings.get('core.binary_dir', None)
        if binary_dir:
            settings.BINARY_DIR = binary_dir

        # Store the settings to make them available to other modules.
        vcsserver.PYRAMID_SETTINGS = settings_merged
        vcsserver.CONFIG = settings_merged

    def _configure(self):
        self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)

        self.config.add_route('service', '/_service')
        self.config.add_route('status', '/status')
        self.config.add_route('hg_proxy', '/proxy/hg')
        self.config.add_route('git_proxy', '/proxy/git')

        # rpc methods
        self.config.add_route('vcs', '/{backend}')

        # streaming rpc remote methods
        self.config.add_route('vcs_stream', '/{backend}/stream')

        # vcs operations clone/push as streaming
        self.config.add_route('stream_git', '/stream/git/*repo_name')
        self.config.add_route('stream_hg', '/stream/hg/*repo_name')

        self.config.add_view(self.status_view, route_name='status', renderer='json')
        self.config.add_view(self.service_view, route_name='service', renderer='msgpack')

        self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
        self.config.add_view(self.git_proxy(), route_name='git_proxy')
        self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
                             vcs_view=self._remotes)
        self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
                             vcs_view=self._remotes)

        self.config.add_view(self.hg_stream(), route_name='stream_hg')
        self.config.add_view(self.git_stream(), route_name='stream_git')

        self.config.add_view_predicate('vcs_view', VCSViewPredicate)

        self.config.add_notfound_view(not_found, renderer='json')

        self.config.add_view(self.handle_vcs_exception, context=Exception)

        self.config.add_tween(
            'vcsserver.tweens.request_wrapper.RequestWrapperTween',
        )
        self.config.add_request_method(
            'vcsserver.lib.request_counter.get_request_counter',
            'request_count')

    def wsgi_app(self):
        return self.config.make_wsgi_app()

    def _vcs_view_params(self, request):
        remote = self._remotes[request.matchdict['backend']]
        payload = msgpack.unpackb(request.body, use_list=True)

        method = payload.get('method')
        params = payload['params']
        wire = params.get('wire')
        args = params.get('args')
        kwargs = params.get('kwargs')
        context_uid = None

        request.registry.vcs_call_context = {
            'method': method,
            'repo_name': payload.get('_repo_name')
        }

        if wire:
            try:
                wire['context'] = context_uid = uuid.UUID(wire['context'])
            except KeyError:
                pass
            args.insert(0, wire)
        repo_state_uid = wire.get('repo_state_uid') if wire else None

        # NOTE(marcink): trading complexity for slight performance
        if log.isEnabledFor(logging.DEBUG):
            # also we SKIP printing out any of those methods args since they maybe excessive
            just_args_methods = {
                'commitctx': ('content', 'removed', 'updated')
            }
            if method in just_args_methods:
                skip_args = just_args_methods[method]
                call_args = ''
                call_kwargs = {}
                for k in kwargs:
                    if k in skip_args:
                        # replace our skip key with dummy
                        call_kwargs[k] = f'RemovedParam({k})'
                    else:
                        call_kwargs[k] = kwargs[k]
            else:
                call_args = args[1:]
                call_kwargs = kwargs

            log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
                      method, call_args, call_kwargs, context_uid, repo_state_uid)

        statsd = request.registry.statsd
        if statsd:
            statsd.incr(
                'vcsserver_method_total', tags=[
                    f"method:{method}",
                ])
        return payload, remote, method, args, kwargs

    def vcs_view(self, request):

        payload, remote, method, args, kwargs = self._vcs_view_params(request)
        payload_id = payload.get('id')

        try:
            resp = getattr(remote, method)(*args, **kwargs)
        except Exception as e:
            exc_info = list(sys.exc_info())
            exc_type, exc_value, exc_traceback = exc_info

            org_exc = getattr(e, '_org_exc', None)
            org_exc_name = None
            org_exc_tb = ''
            if org_exc:
                org_exc_name = org_exc.__class__.__name__
                org_exc_tb = getattr(e, '_org_exc_tb', '')
                # replace our "faked" exception with our org
                exc_info[0] = org_exc.__class__
                exc_info[1] = org_exc

            should_store_exc = True
            if org_exc:
                def get_exc_fqn(_exc_obj):
                    module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
                    return module_name + '.' + org_exc_name

                exc_fqn = get_exc_fqn(org_exc)

                if exc_fqn in ['mercurial.error.RepoLookupError',
                               'vcsserver.exceptions.RefNotFoundException']:
                    should_store_exc = False

            if should_store_exc:
                store_exception(id(exc_info), exc_info, request_path=request.path)

            tb_info = ''.join(
                traceback.format_exception(exc_type, exc_value, exc_traceback))

            type_ = e.__class__.__name__
            if type_ not in self.ALLOWED_EXCEPTIONS:
                type_ = None

            resp = {
                'id': payload_id,
                'error': {
                    'message': str(e),
                    'traceback': tb_info,
                    'org_exc': org_exc_name,
                    'org_exc_tb': org_exc_tb,
                    'type': type_
                }
            }

            try:
                resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
            except AttributeError:
                pass
        else:
            resp = {
                'id': payload_id,
                'result': resp
            }
        log.debug('Serving data for method %s', method)
        return resp

    def vcs_stream_view(self, request):
        payload, remote, method, args, kwargs = self._vcs_view_params(request)
        # this method has a stream: marker we remove it here
        method = method.split('stream:')[-1]
        chunk_size = safe_int(payload.get('chunk_size')) or 4096

        try:
            resp = getattr(remote, method)(*args, **kwargs)
        except Exception as e:
            raise

        def get_chunked_data(method_resp):
            stream = io.BytesIO(method_resp)
            while 1:
                chunk = stream.read(chunk_size)
                if not chunk:
                    break
                yield chunk

        response = Response(app_iter=get_chunked_data(resp))
        response.content_type = 'application/octet-stream'

        return response

    def status_view(self, request):
        import vcsserver
        return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
                'pid': os.getpid()}

    def service_view(self, request):
        import vcsserver

        payload = msgpack.unpackb(request.body, use_list=True)
        server_config, app_config = {}, {}

        try:
            path = self.global_config['__file__']
            config = configparser.RawConfigParser()

            config.read(path)

            if config.has_section('server:main'):
                server_config = dict(config.items('server:main'))
            if config.has_section('app:main'):
                app_config = dict(config.items('app:main'))

        except Exception:
            log.exception('Failed to read .ini file for display')

        environ = list(os.environ.items())

        resp = {
            'id': payload.get('id'),
            'result': dict(
                version=vcsserver.__version__,
                config=server_config,
                app_config=app_config,
                environ=environ,
                payload=payload,
            )
        }
        return resp

    def _msgpack_renderer_factory(self, info):

        def _render(value, system):
            bin_type = False
            res = value.get('result')
            if res and isinstance(res, BinaryEnvelope):
                log.debug('Result is wrapped in BinaryEnvelope type')
                value['result'] = res.value
                bin_type = res.bin_type

            request = system.get('request')
            if request is not None:
                response = request.response
                ct = response.content_type
                if ct == response.default_content_type:
                    response.content_type = 'application/x-msgpack'
                    if bin_type:
                        response.content_type = 'application/x-msgpack-bin'

            return msgpack.packb(value, use_bin_type=bin_type)
        return _render

    def set_env_from_config(self, environ, config):
        dict_conf = {}
        try:
            for elem in config:
                if elem[0] == 'rhodecode':
                    dict_conf = json.loads(elem[2])
                    break
        except Exception:
            log.exception('Failed to fetch SCM CONFIG')
            return

        username = dict_conf.get('username')
        if username:
            environ['REMOTE_USER'] = username
            # mercurial specific, some extension api rely on this
            environ['HGUSER'] = username

        ip = dict_conf.get('ip')
        if ip:
            environ['REMOTE_HOST'] = ip

        if _is_request_chunked(environ):
            # set the compatibility flag for webob
            environ['wsgi.input_terminated'] = True

    def hg_proxy(self):
        @wsgiapp
        def _hg_proxy(environ, start_response):
            app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
            return app(environ, start_response)
        return _hg_proxy

    def git_proxy(self):
        @wsgiapp
        def _git_proxy(environ, start_response):
            app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
            return app(environ, start_response)
        return _git_proxy

    def hg_stream(self):
        if self._use_echo_app:
            @wsgiapp
            def _hg_stream(environ, start_response):
                app = EchoApp('fake_path', 'fake_name', None)
                return app(environ, start_response)
            return _hg_stream
        else:
            @wsgiapp
            def _hg_stream(environ, start_response):
                log.debug('http-app: handling hg stream')

                packed_cc = base64.b64decode(environ['HTTP_X_RC_VCS_STREAM_CALL_CONTEXT'])
                call_context = msgpack.unpackb(packed_cc)

                repo_path = call_context['repo_path']
                repo_name = call_context['repo_name']
                config = call_context['repo_config']

                app = scm_app.create_hg_wsgi_app(
                    repo_path, repo_name, config)

                # Consistent path information for hgweb
                environ['PATH_INFO'] = call_context['path_info']
                environ['REPO_NAME'] = repo_name
                self.set_env_from_config(environ, config)

                log.debug('http-app: starting app handler '
                          'with %s and process request', app)
                return app(environ, ResponseFilter(start_response))
            return _hg_stream

    def git_stream(self):
        if self._use_echo_app:
            @wsgiapp
            def _git_stream(environ, start_response):
                app = EchoApp('fake_path', 'fake_name', None)
                return app(environ, start_response)
            return _git_stream
        else:
            @wsgiapp
            def _git_stream(environ, start_response):
                log.debug('http-app: handling git stream')

                packed_cc = base64.b64decode(environ['HTTP_X_RC_VCS_STREAM_CALL_CONTEXT'])
                call_context = msgpack.unpackb(packed_cc)

                repo_path = call_context['repo_path']
                repo_name = call_context['repo_name']
                config = call_context['repo_config']

                environ['PATH_INFO'] = call_context['path_info']
                self.set_env_from_config(environ, config)

                content_type = environ.get('CONTENT_TYPE', '')

                path = environ['PATH_INFO']
                is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
                log.debug(
                    'LFS: Detecting if request `%s` is LFS server path based '
                    'on content type:`%s`, is_lfs:%s',
                    path, content_type, is_lfs_request)

                if not is_lfs_request:
                    # fallback detection by path
                    if GIT_LFS_PROTO_PAT.match(path):
                        is_lfs_request = True
                    log.debug(
                        'LFS: fallback detection by path of: `%s`, is_lfs:%s',
                        path, is_lfs_request)

                if is_lfs_request:
                    app = scm_app.create_git_lfs_wsgi_app(
                        repo_path, repo_name, config)
                else:
                    app = scm_app.create_git_wsgi_app(
                        repo_path, repo_name, config)

                log.debug('http-app: starting app handler '
                          'with %s and process request', app)

                return app(environ, start_response)

            return _git_stream

    def handle_vcs_exception(self, exception, request):
        _vcs_kind = getattr(exception, '_vcs_kind', '')
        if _vcs_kind == 'repo_locked':
            # Get custom repo-locked status code if present.
            status_code = request.headers.get('X-RC-Locked-Status-Code')
            return HTTPRepoLocked(
                title=str(exception), status_code=status_code)

        elif _vcs_kind == 'repo_branch_protected':
            # Get custom repo-branch-protected status code if present.
            return HTTPRepoBranchProtected(title=str(exception))

        exc_info = request.exc_info
        store_exception(id(exc_info), exc_info)

        traceback_info = 'unavailable'
        if request.exc_info:
            exc_type, exc_value, exc_tb = request.exc_info
            traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))

        log.error(
            'error occurred handling this request for path: %s, \n tb: %s',
            request.path, traceback_info)

        statsd = request.registry.statsd
        if statsd:
            exc_type = f"{exception.__class__.__module__}.{exception.__class__.__name__}"
            statsd.incr('vcsserver_exception_total',
                        tags=[f"type:{exc_type}"])
        raise exception


class ResponseFilter(object):

    def __init__(self, start_response):
        self._start_response = start_response

    def __call__(self, status, response_headers, exc_info=None):
        headers = tuple(
            (h, v) for h, v in response_headers
            if not wsgiref.util.is_hop_by_hop(h))
        return self._start_response(status, headers, exc_info)


def sanitize_settings_and_apply_defaults(global_config, settings):
    global_settings_maker = SettingsMaker(global_config)
    settings_maker = SettingsMaker(settings)

    settings_maker.make_setting('logging.autoconfigure', False, parser='bool')

    logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
    settings_maker.enable_logging(logging_conf)

    # Default includes, possible to change as a user
    pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
    log.debug("Using the following pyramid.includes: %s", pyramid_includes)

    settings_maker.make_setting('__file__', global_config.get('__file__'))

    settings_maker.make_setting('pyramid.default_locale_name', 'en')
    settings_maker.make_setting('locale', 'en_US.UTF-8')

    settings_maker.make_setting('core.binary_dir', '')

    temp_store = tempfile.gettempdir()
    default_cache_dir = os.path.join(temp_store, 'rc_cache')
    # save default, cache dir, and use it for all backends later.
    default_cache_dir = settings_maker.make_setting(
        'cache_dir',
        default=default_cache_dir, default_when_empty=True,
        parser='dir:ensured')

    # exception store cache
    settings_maker.make_setting(
        'exception_tracker.store_path',
        default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
        parser='dir:ensured'
    )

    # repo_object cache defaults
    settings_maker.make_setting(
        'rc_cache.repo_object.backend',
        default='dogpile.cache.rc.file_namespace',
        parser='string')
    settings_maker.make_setting(
        'rc_cache.repo_object.expiration_time',
        default=30 * 24 * 60 * 60,  # 30days
        parser='int')
    settings_maker.make_setting(
        'rc_cache.repo_object.arguments.filename',
        default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
        parser='string')

    # statsd
    settings_maker.make_setting('statsd.enabled', False, parser='bool')
    settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
    settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
    settings_maker.make_setting('statsd.statsd_prefix', '')
    settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')

    settings_maker.env_expand()


def main(global_config, **settings):
    start_time = time.time()
    log.info('Pyramid app config starting')

    if MercurialFactory:
        hgpatches.patch_largefiles_capabilities()
        hgpatches.patch_subrepo_type_mapping()

    # Fill in and sanitize the defaults & do ENV expansion
    sanitize_settings_and_apply_defaults(global_config, settings)

    # init and bootstrap StatsdClient
    StatsdClient.setup(settings)

    pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
    total_time = time.time() - start_time
    log.info('Pyramid app `%s` created and configured in %.2fs',
             getattr(pyramid_app, 'func_name', 'pyramid_app'), total_time)
    return pyramid_app