# -*- coding: utf-8 -*-

# Copyright (C) 2010-2017 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/

import json

from mock import patch
import pytest
from pylons import tmpl_context as c

import rhodecode
from rhodecode.lib.utils import map_groups
from rhodecode.model.db import Repository, User, RepoGroup
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.settings import SettingsModel
from rhodecode.tests import TestController, url, TEST_USER_ADMIN_LOGIN
from rhodecode.tests.fixture import Fixture


fixture = Fixture()


class TestHomeController(TestController):

    def test_index(self):
        self.log_user()
        response = self.app.get(url(controller='home', action='index'))
        # if global permission is set
        response.mustcontain('Add Repository')

        # search for objects inside the JavaScript JSON
        for repo in Repository.getAll():
            response.mustcontain('"name_raw": "%s"' % repo.repo_name)

    def test_index_contains_statics_with_ver(self):
        self.log_user()
        response = self.app.get(url(controller='home', action='index'))

        rhodecode_version_hash = c.rhodecode_version_hash
        response.mustcontain('style.css?ver={0}'.format(rhodecode_version_hash))
        response.mustcontain('rhodecode-components.js?ver={0}'.format(rhodecode_version_hash))

    def test_index_contains_backend_specific_details(self, backend):
        self.log_user()
        response = self.app.get(url(controller='home', action='index'))
        tip = backend.repo.get_commit().raw_id

        # html in javascript variable:
        response.mustcontain(r'<i class=\"icon-%s\"' % (backend.alias, ))
        response.mustcontain(r'href=\"/%s\"' % (backend.repo_name, ))

        response.mustcontain("""/%s/changeset/%s""" % (backend.repo_name, tip))
        response.mustcontain("""Added a symlink""")

    def test_index_with_anonymous_access_disabled(self):
        with fixture.anon_access(False):
            response = self.app.get(url(controller='home', action='index'),
                                    status=302)
            assert 'login' in response.location

    def test_index_page_on_groups(self, autologin_user, repo_group):
        response = self.app.get(url('repo_group_home', group_name='gr1'))
        response.mustcontain("gr1/repo_in_group")

    def test_index_page_on_group_with_trailing_slash(
            self, autologin_user, repo_group):
        response = self.app.get(url('repo_group_home', group_name='gr1') + '/')
        response.mustcontain("gr1/repo_in_group")

    @pytest.fixture(scope='class')
    def repo_group(self, request):
        gr = fixture.create_repo_group('gr1')
        fixture.create_repo(name='gr1/repo_in_group', repo_group=gr)

        @request.addfinalizer
        def cleanup():
            RepoModel().delete('gr1/repo_in_group')
            RepoGroupModel().delete(repo_group='gr1', force_delete=True)
            Session().commit()

    def test_index_with_name_with_tags(self, autologin_user):
        user = User.get_by_username('test_admin')
        user.name = '<img src="/image1" onload="alert(\'Hello, World!\');">'
        user.lastname = (
            '<img src="/image2" onload="alert(\'Hello, World!\');">')
        Session().add(user)
        Session().commit()

        response = self.app.get(url(controller='home', action='index'))
        response.mustcontain(
            '&lt;img src=&#34;/image1&#34; onload=&#34;'
            'alert(&#39;Hello, World!&#39;);&#34;&gt;')
        response.mustcontain(
            '&lt;img src=&#34;/image2&#34; onload=&#34;'
            'alert(&#39;Hello, World!&#39;);&#34;&gt;')

    @pytest.mark.parametrize("name, state", [
        ('Disabled', False),
        ('Enabled', True),
    ])
    def test_index_show_version(self, autologin_user, name, state):
        version_string = 'RhodeCode Enterprise %s' % rhodecode.__version__

        sett = SettingsModel().create_or_update_setting(
            'show_version', state, 'bool')
        Session().add(sett)
        Session().commit()
        SettingsModel().invalidate_settings_cache()

        response = self.app.get(url(controller='home', action='index'))
        if state is True:
            response.mustcontain(version_string)
        if state is False:
            response.mustcontain(no=[version_string])


class TestUserAutocompleteData(TestController):
    def test_returns_list_of_users(self, user_util):
        self.log_user()
        user = user_util.create_user(is_active=True)
        user_name = user.username
        response = self.app.get(
            url(controller='home', action='user_autocomplete_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
        result = json.loads(response.body)
        values = [suggestion['value'] for suggestion in result['suggestions']]
        assert user_name in values

    def test_returns_inactive_users_when_active_flag_sent(self, user_util):
        self.log_user()
        user = user_util.create_user(is_active=False)
        user_name = user.username
        response = self.app.get(
            url(controller='home', action='user_autocomplete_data',
                user_groups='true', active='0'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
        result = json.loads(response.body)
        values = [suggestion['value'] for suggestion in result['suggestions']]
        assert user_name in values

    def test_returns_groups_when_user_groups_sent(self, user_util):
        self.log_user()
        group = user_util.create_user_group(user_groups_active=True)
        group_name = group.users_group_name
        response = self.app.get(
            url(controller='home', action='user_autocomplete_data',
                user_groups='true'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
        result = json.loads(response.body)
        values = [suggestion['value'] for suggestion in result['suggestions']]
        assert group_name in values

    def test_result_is_limited_when_query_is_sent(self):
        self.log_user()
        fake_result = [
            {
                'first_name': 'John',
                'value_display': 'hello{} (John Smith)'.format(i),
                'icon_link': '/images/user14.png',
                'value': 'hello{}'.format(i),
                'last_name': 'Smith',
                'username': 'hello{}'.format(i),
                'id': i,
                'value_type': u'user'
            }
            for i in range(10)
        ]
        users_patcher = patch.object(
            RepoModel, 'get_users', return_value=fake_result)
        groups_patcher = patch.object(
            RepoModel, 'get_user_groups', return_value=fake_result)

        query = 'hello'
        with users_patcher as users_mock, groups_patcher as groups_mock:
            response = self.app.get(
                url(controller='home', action='user_autocomplete_data',
                    user_groups='true', query=query),
                headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)

        result = json.loads(response.body)
        users_mock.assert_called_once_with(
            name_contains=query, only_active=True)
        groups_mock.assert_called_once_with(
            name_contains=query, only_active=True)
        assert len(result['suggestions']) == 20


def assert_and_get_content(result):
    repos = []
    groups = []
    commits = []
    for data in result:
        for data_item in data['children']:
            assert data_item['id']
            assert data_item['text']
            assert data_item['url']
            if data_item['type'] == 'repo':
                repos.append(data_item)
            elif data_item['type'] == 'group':
                groups.append(data_item)
            elif data_item['type'] == 'commit':
                commits.append(data_item)
            else:
                raise Exception('invalid type %s' % data_item['type'])

    return repos, groups, commits


class TestGotoSwitcherData(TestController):
    required_repos_with_groups = [
        'abc',
        'abc-fork',
        'forks/abcd',
        'abcd',
        'abcde',
        'a/abc',
        'aa/abc',
        'aaa/abc',
        'aaaa/abc',
        'repos_abc/aaa/abc',
        'abc_repos/abc',
        'abc_repos/abcd',
        'xxx/xyz',
        'forked-abc/a/abc'
    ]

    @pytest.fixture(autouse=True, scope='class')
    def prepare(self, request, pylonsapp):
        for repo_and_group in self.required_repos_with_groups:
            # create structure of groups and return the last group

            repo_group = map_groups(repo_and_group)

            RepoModel()._create_repo(
                repo_and_group, 'hg', 'test-ac', TEST_USER_ADMIN_LOGIN,
                repo_group=getattr(repo_group, 'group_id', None))

            Session().commit()

        request.addfinalizer(self.cleanup)

    def cleanup(self):
        # first delete all repos
        for repo_and_groups in self.required_repos_with_groups:
            repo = Repository.get_by_repo_name(repo_and_groups)
            if repo:
                RepoModel().delete(repo)
                Session().commit()

        # then delete all empty groups
        for repo_and_groups in self.required_repos_with_groups:
            if '/' in repo_and_groups:
                r_group = repo_and_groups.rsplit('/', 1)[0]
                repo_group = RepoGroup.get_by_group_name(r_group)
                if not repo_group:
                    continue
                parents = repo_group.parents
                RepoGroupModel().delete(repo_group, force_delete=True)
                Session().commit()

                for el in reversed(parents):
                    RepoGroupModel().delete(el, force_delete=True)
                    Session().commit()

    def test_returns_list_of_repos_and_groups(self):
        self.log_user()

        response = self.app.get(
            url(controller='home', action='goto_switcher_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        assert len(repos) == len(Repository.get_all())
        assert len(groups) == len(RepoGroup.get_all())
        assert len(commits) == 0

    def test_returns_list_of_repos_and_groups_filtered(self):
        self.log_user()

        response = self.app.get(
            url(controller='home', action='goto_switcher_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
            params={'query': 'abc'}, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        assert len(repos) == 13
        assert len(groups) == 5
        assert len(commits) == 0

    def test_returns_list_of_properly_sorted_and_filtered(self):
        self.log_user()

        response = self.app.get(
            url(controller='home', action='goto_switcher_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
            params={'query': 'abc'}, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        test_repos = [x['text'] for x in repos[:4]]
        assert ['abc', 'abcd', 'a/abc', 'abcde'] == test_repos

        test_groups = [x['text'] for x in groups[:4]]
        assert ['abc_repos', 'repos_abc',
                'forked-abc', 'forked-abc/a'] == test_groups


class TestRepoListData(TestController):
    def test_returns_list_of_repos_and_groups(self, user_util):
        self.log_user()

        response = self.app.get(
            url(controller='home', action='repo_list_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        assert len(repos) == len(Repository.get_all())
        assert len(groups) == 0
        assert len(commits) == 0

    def test_returns_list_of_repos_and_groups_filtered(self):
        self.log_user()

        response = self.app.get(
            url(controller='home', action='repo_list_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
            params={'query': 'vcs_test_git'}, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        assert len(repos) == len(Repository.query().filter(
            Repository.repo_name.ilike('%vcs_test_git%')).all())
        assert len(groups) == 0
        assert len(commits) == 0

    def test_returns_list_of_repos_and_groups_filtered_with_type(self):
        self.log_user()

        response = self.app.get(
            url(controller='home', action='repo_list_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
            params={'query': 'vcs_test_git', 'repo_type': 'git'}, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        assert len(repos) == len(Repository.query().filter(
            Repository.repo_name.ilike('%vcs_test_git%')).all())
        assert len(groups) == 0
        assert len(commits) == 0

    def test_returns_list_of_repos_non_ascii_query(self):
        self.log_user()
        response = self.app.get(
            url(controller='home', action='repo_list_data'),
            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
            params={'query': 'ć_vcs_test_ą', 'repo_type': 'git'}, status=200)
        result = json.loads(response.body)['results']

        repos, groups, commits = assert_and_get_content(result)

        assert len(repos) == 0
        assert len(groups) == 0
        assert len(commits) == 0