# -*- coding: utf-8 -*-

# Copyright (C) 2010-2016  RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/

import tempfile

import mock
import pytest

from rhodecode.lib.exceptions import AttachedForksError
from rhodecode.lib.utils import make_db_config
from rhodecode.model.db import Repository
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.scm import ScmModel
from rhodecode.lib.utils2 import safe_unicode


class TestRepoModel:

    def test_remove_repo(self, backend):
        repo = backend.create_repo()
        Session().commit()
        RepoModel().delete(repo=repo)
        Session().commit()

        repos = ScmModel().repo_scan()

        assert Repository.get_by_repo_name(repo_name=backend.repo_name) is None
        assert repo.repo_name not in repos

    def test_remove_repo_raises_exc_when_attached_forks(self, backend):
        repo = backend.create_repo()
        Session().commit()
        backend.create_fork()
        Session().commit()

        with pytest.raises(AttachedForksError):
            RepoModel().delete(repo=repo)

    def test_remove_repo_delete_forks(self, backend):
        repo = backend.create_repo()
        Session().commit()

        fork = backend.create_fork()
        Session().commit()

        fork_of_fork = backend.create_fork()
        Session().commit()

        RepoModel().delete(repo=repo, forks='delete')
        Session().commit()

        assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None
        assert Repository.get_by_repo_name(repo_name=fork.repo_name) is None
        assert (
            Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name)
            is None)

    def test_remove_repo_detach_forks(self, backend):
        repo = backend.create_repo()
        Session().commit()

        fork = backend.create_fork()
        Session().commit()

        fork_of_fork = backend.create_fork()
        Session().commit()

        RepoModel().delete(repo=repo, forks='detach')
        Session().commit()

        assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None
        assert (
            Repository.get_by_repo_name(repo_name=fork.repo_name) is not None)
        assert (
            Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name)
            is not None)

    @pytest.mark.parametrize("filename, expected", [
        ("README", True),
        ("README.rst", False),
    ])
    def test_filenode_is_link(self, vcsbackend, filename, expected):
        repo = vcsbackend.repo
        assert repo.get_commit().is_link(filename) is expected

    def test_get_commit(self, backend):
        backend.repo.get_commit()

    def test_get_changeset_is_deprecated(self, backend):
        repo = backend.repo
        pytest.deprecated_call(repo.get_changeset)

    def test_clone_url_encrypted_value(self, backend):
        repo = backend.create_repo()
        Session().commit()

        repo.clone_url = 'https://marcink:qweqwe@code.rhodecode.com'
        Session().add(repo)
        Session().commit()

        assert repo.clone_url == 'https://marcink:qweqwe@code.rhodecode.com'

    @pytest.mark.backends("git", "svn")
    def test_create_filesystem_repo_installs_hooks(self, tmpdir, backend):
        hook_methods = {
            'git': 'install_git_hook',
            'svn': 'install_svn_hooks'
        }
        repo = backend.create_repo()
        repo_name = repo.repo_name
        model = RepoModel()
        repo_location = tempfile.mkdtemp()
        model.repos_path = repo_location
        method = hook_methods[backend.alias]
        with mock.patch.object(ScmModel, method) as hooks_mock:
            model._create_filesystem_repo(
                repo_name, backend.alias, repo_group='', clone_uri=None)
        assert hooks_mock.call_count == 1
        hook_args, hook_kwargs = hooks_mock.call_args
        assert hook_args[0].name == repo_name

    @pytest.mark.parametrize("use_global_config, repo_name_passed", [
        (True, False),
        (False, True)
    ])
    def test_per_repo_config_is_generated_during_filesystem_repo_creation(
            self, tmpdir, backend, use_global_config, repo_name_passed):
        repo_name = 'test-{}-repo-{}'.format(backend.alias, use_global_config)
        config = make_db_config()
        model = RepoModel()
        with mock.patch('rhodecode.model.repo.make_db_config') as config_mock:
            config_mock.return_value = config
            model._create_filesystem_repo(
                repo_name, backend.alias, repo_group='', clone_uri=None,
                use_global_config=use_global_config)
        expected_repo_name = repo_name if repo_name_passed else None
        expected_call = mock.call(clear_session=False, repo=expected_repo_name)
        assert expected_call in config_mock.call_args_list

    def test_update_commit_cache_with_config(serf, backend):
        repo = backend.create_repo()
        with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm:
            scm_instance = mock.Mock()
            scm_instance.get_commit.return_value = {
                'raw_id': 40*'0',
                'revision': 1
            }
            scm.return_value = scm_instance
            repo.update_commit_cache()
            scm.assert_called_with(cache=False, config=None)
            config = {'test': 'config'}
            repo.update_commit_cache(config=config)
            scm.assert_called_with(
                cache=False, config=config)


class TestGetUsers(object):
    def test_returns_active_users(self, backend, user_util):
        for i in range(4):
            is_active = i % 2 == 0
            user_util.create_user(active=is_active, lastname='Fake user')

        with mock.patch('rhodecode.lib.helpers.gravatar_url'):
            users = RepoModel().get_users()
        fake_users = [u for u in users if u['last_name'] == 'Fake user']
        assert len(fake_users) == 2

        expected_keys = (
            'id', 'first_name', 'last_name', 'username', 'icon_link',
            'value_display', 'value', 'value_type')
        for user in users:
            assert user['value_type'] is 'user'
            for key in expected_keys:
                assert key in user

    def test_returns_user_filtered_by_last_name(self, backend, user_util):
        keywords = ('aBc', u'ünicode')
        for keyword in keywords:
            for i in range(2):
                user_util.create_user(
                    active=True, lastname=u'Fake {} user'.format(keyword))

        with mock.patch('rhodecode.lib.helpers.gravatar_url'):
            keyword = keywords[1].lower()
            users = RepoModel().get_users(name_contains=keyword)

        fake_users = [u for u in users if u['last_name'].startswith('Fake')]
        assert len(fake_users) == 2
        for user in fake_users:
            assert user['last_name'] == safe_unicode('Fake ünicode user')

    def test_returns_user_filtered_by_first_name(self, backend, user_util):
        created_users = []
        keywords = ('aBc', u'ünicode')
        for keyword in keywords:
            for i in range(2):
                created_users.append(user_util.create_user(
                    active=True, lastname='Fake user',
                    firstname=u'Fake {} user'.format(keyword)))

        keyword = keywords[1].lower()
        with mock.patch('rhodecode.lib.helpers.gravatar_url'):
            users = RepoModel().get_users(name_contains=keyword)

        fake_users = [u for u in users if u['last_name'].startswith('Fake')]
        assert len(fake_users) == 2
        for user in fake_users:
            assert user['first_name'] == safe_unicode('Fake ünicode user')

    def test_returns_user_filtered_by_username(self, backend, user_util):
        created_users = []
        for i in range(5):
            created_users.append(user_util.create_user(
                active=True, lastname='Fake user'))

        user_filter = created_users[-1].username[-2:]
        with mock.patch('rhodecode.lib.helpers.gravatar_url'):
            users = RepoModel().get_users(name_contains=user_filter)

        fake_users = [u for u in users if u['last_name'].startswith('Fake')]
        assert len(fake_users) == 1
        assert fake_users[0]['username'] == created_users[-1].username

    def test_returns_limited_user_list(self, backend, user_util):
        created_users = []
        for i in range(5):
            created_users.append(user_util.create_user(
                active=True, lastname='Fake user'))

        with mock.patch('rhodecode.lib.helpers.gravatar_url'):
            users = RepoModel().get_users(name_contains='Fake', limit=3)

        fake_users = [u for u in users if u['last_name'].startswith('Fake')]
        assert len(fake_users) == 3


class TestGetUserGroups(object):
    def test_returns_filtered_list(self, backend, user_util):
        created_groups = []
        for i in range(4):
            created_groups.append(
                user_util.create_user_group(users_group_active=True))

        group_filter = created_groups[-1].users_group_name[-2:]
        with self._patch_user_group_list():
            groups = RepoModel().get_user_groups(group_filter)

        fake_groups = [
            u for u in groups if u['value'].startswith('test_returns')]
        assert len(fake_groups) == 1
        assert fake_groups[0]['value'] == created_groups[-1].users_group_name
        assert fake_groups[0]['value_display'].startswith(
            'Group: test_returns')

    def test_returns_limited_list(self, backend, user_util):
        created_groups = []
        for i in range(3):
            created_groups.append(
                user_util.create_user_group(users_group_active=True))

        with self._patch_user_group_list():
            groups = RepoModel().get_user_groups('test_returns')

        fake_groups = [
            u for u in groups if u['value'].startswith('test_returns')]
        assert len(fake_groups) == 3

    def test_returns_active_user_groups(self, backend, user_util):
        for i in range(4):
            is_active = i % 2 == 0
            user_util.create_user_group(users_group_active=is_active)

        with self._patch_user_group_list():
            groups = RepoModel().get_user_groups()
        expected = ('id', 'icon_link', 'value_display', 'value', 'value_type')
        for group in groups:
            assert group['value_type'] is 'user_group'
            for key in expected:
                assert key in group

        fake_groups = [
            u for u in groups if u['value'].startswith('test_returns')]
        assert len(fake_groups) == 2
        for user in fake_groups:
            assert user['value_display'].startswith('Group: test_returns')

    def _patch_user_group_list(self):
        def side_effect(group_list, perm_set):
            return group_list
        return mock.patch(
            'rhodecode.model.repo.UserGroupList', side_effect=side_effect)