# -*- coding: utf-8 -*-

# Copyright (C) 2010-2016  RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/

import os
import stat
import sys

import pytest
from mock import Mock, patch, DEFAULT

import rhodecode
from rhodecode.model import db, scm


def test_scm_instance_config(backend):
    repo = backend.create_repo()
    with patch.multiple('rhodecode.model.db.Repository',
                        _get_instance=DEFAULT,
                        _get_instance_cached=DEFAULT) as mocks:
        repo.scm_instance()
        mocks['_get_instance'].assert_called_with(
            config=None, cache=False)

        config = {'some': 'value'}
        repo.scm_instance(config=config)
        mocks['_get_instance'].assert_called_with(
            config=config, cache=False)

        with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}):
            repo.scm_instance(config=config)
            mocks['_get_instance_cached'].assert_called()


def test__get_instance_config(backend):
    repo = backend.create_repo()
    vcs_class = Mock()
    with patch.multiple('rhodecode.lib.vcs.backends',
                        get_scm=DEFAULT,
                        get_backend=DEFAULT) as mocks:
        mocks['get_scm'].return_value = backend.alias
        mocks['get_backend'].return_value = vcs_class
        with patch('rhodecode.model.db.Repository._config') as config_mock:
            repo._get_instance()
            vcs_class.assert_called_with(
                repo_path=repo.repo_full_path, config=config_mock,
                create=False, with_wire={'cache': True})

        new_config = {'override': 'old_config'}
        repo._get_instance(config=new_config)
        vcs_class.assert_called_with(
            repo_path=repo.repo_full_path, config=new_config, create=False,
            with_wire={'cache': True})


def test_mark_for_invalidation_config(backend):
    repo = backend.create_repo()
    with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock:
        scm.ScmModel().mark_for_invalidation(repo.repo_name)
        _, kwargs = _mock.call_args
        assert kwargs['config'].__dict__ == repo._config.__dict__


def test_mark_for_invalidation_with_delete_updates_last_commit(backend):
    commits = [{'message': 'A'}, {'message': 'B'}]
    repo = backend.create_repo(commits=commits)
    scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
    assert repo.changeset_cache['revision'] == 1


def test_mark_for_invalidation_with_delete_updates_last_commit_empty(backend):
    repo = backend.create_repo()
    scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
    assert repo.changeset_cache['revision'] == -1


def test_strip_with_multiple_heads(backend_hg):
    commits = [
        {'message': 'A'},
        {'message': 'a'},
        {'message': 'b'},
        {'message': 'B', 'parents': ['A']},
        {'message': 'a1'},
    ]
    repo = backend_hg.create_repo(commits=commits)
    commit_ids = backend_hg.commit_ids

    model = scm.ScmModel()
    model.strip(repo, commit_ids['b'], branch=None)

    vcs_repo = repo.scm_instance()
    rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
    assert len(rest_commit_ids) == 4
    assert commit_ids['b'] not in rest_commit_ids


def test_strip_with_single_heads(backend_hg):
    commits = [
        {'message': 'A'},
        {'message': 'a'},
        {'message': 'b'},
    ]
    repo = backend_hg.create_repo(commits=commits)
    commit_ids = backend_hg.commit_ids

    model = scm.ScmModel()
    model.strip(repo, commit_ids['b'], branch=None)

    vcs_repo = repo.scm_instance()
    rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
    assert len(rest_commit_ids) == 2
    assert commit_ids['b'] not in rest_commit_ids


def test_get_nodes_returns_unicode_flat(backend_random):
    repo = backend_random.repo
    directories, files = scm.ScmModel().get_nodes(
        repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
        flat=True)
    assert_contains_only_unicode(directories)
    assert_contains_only_unicode(files)


def test_get_nodes_returns_unicode_non_flat(backend_random):
    repo = backend_random.repo
    directories, files = scm.ScmModel().get_nodes(
        repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
        flat=False)
    # johbo: Checking only the names for now, since that is the critical
    # part.
    assert_contains_only_unicode([d['name'] for d in directories])
    assert_contains_only_unicode([f['name'] for f in files])


def test_get_nodes_max_file_bytes(backend_random):
    repo = backend_random.repo
    max_file_bytes = 10
    directories, files = scm.ScmModel().get_nodes(
        repo.repo_name, repo.get_commit(commit_idx=0).raw_id, content=True,
        extended_info=True, flat=False)
    assert any(file['content'] and len(file['content']) > max_file_bytes
               for file in files)

    directories, files = scm.ScmModel().get_nodes(
        repo.repo_name, repo.get_commit(commit_idx=0).raw_id, content=True,
        extended_info=True, flat=False, max_file_bytes=max_file_bytes)
    assert all(
        file['content'] is None if file['size'] > max_file_bytes else True
        for file in files)


def assert_contains_only_unicode(structure):
    assert structure
    for value in structure:
        assert isinstance(value, unicode)


@pytest.mark.backends("hg", "git")
def test_get_non_unicode_reference(backend):
    model = scm.ScmModel()
    non_unicode_list = ["Adını".decode("cp1254")]

    def scm_instance():
        return Mock(
            branches=non_unicode_list, bookmarks=non_unicode_list,
            tags=non_unicode_list, alias=backend.alias)

    repo = Mock(__class__=db.Repository, scm_instance=scm_instance)
    choices, __ = model.get_repo_landing_revs(repo=repo)
    if backend.alias == 'hg':
        valid_choices = [
            'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
            u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1']
    else:
        valid_choices = [
            'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
            u'tag:Ad\xc4\xb1n\xc4\xb1']

    assert choices == valid_choices


class TestInstallSvnHooks(object):
    HOOK_FILES = ('pre-commit', 'post-commit')

    def test_new_hooks_are_created(self, backend_svn):
        model = scm.ScmModel()
        repo = backend_svn.create_repo()
        vcs_repo = repo.scm_instance()
        model.install_svn_hooks(vcs_repo)

        hooks_path = os.path.join(vcs_repo.path, 'hooks')
        assert os.path.isdir(hooks_path)
        for file_name in self.HOOK_FILES:
            file_path = os.path.join(hooks_path, file_name)
            self._check_hook_file_mode(file_path)
            self._check_hook_file_content(file_path)

    def test_rc_hooks_are_replaced(self, backend_svn):
        model = scm.ScmModel()
        repo = backend_svn.create_repo()
        vcs_repo = repo.scm_instance()
        hooks_path = os.path.join(vcs_repo.path, 'hooks')
        file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]

        for file_path in file_paths:
            self._create_fake_hook(
                file_path, content="RC_HOOK_VER = 'abcde'\n")

        model.install_svn_hooks(vcs_repo)

        for file_path in file_paths:
            self._check_hook_file_content(file_path)

    def test_non_rc_hooks_are_not_replaced_without_force_create(
            self, backend_svn):
        model = scm.ScmModel()
        repo = backend_svn.create_repo()
        vcs_repo = repo.scm_instance()
        hooks_path = os.path.join(vcs_repo.path, 'hooks')
        file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
        non_rc_content = "exit 0\n"

        for file_path in file_paths:
            self._create_fake_hook(file_path, content=non_rc_content)

        model.install_svn_hooks(vcs_repo)

        for file_path in file_paths:
            with open(file_path, 'rt') as hook_file:
                content = hook_file.read()
            assert content == non_rc_content

    def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn):
        model = scm.ScmModel()
        repo = backend_svn.create_repo()
        vcs_repo = repo.scm_instance()
        hooks_path = os.path.join(vcs_repo.path, 'hooks')
        file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
        non_rc_content = "exit 0\n"

        for file_path in file_paths:
            self._create_fake_hook(file_path, content=non_rc_content)

        model.install_svn_hooks(vcs_repo, force_create=True)

        for file_path in file_paths:
            self._check_hook_file_content(file_path)

    def _check_hook_file_mode(self, file_path):
        assert os.path.exists(file_path)
        stat_info = os.stat(file_path)

        file_mode = stat.S_IMODE(stat_info.st_mode)
        expected_mode = int('755', 8)
        assert expected_mode == file_mode

    def _check_hook_file_content(self, file_path):
        with open(file_path, 'rt') as hook_file:
            content = hook_file.read()

        expected_env = '#!{}'.format(sys.executable)
        expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(
            rhodecode.__version__)
        assert content.strip().startswith(expected_env)
        assert expected_rc_version in content

    def _create_fake_hook(self, file_path, content):
        with open(file_path, 'w') as hook_file:
            hook_file.write(content)


class TestCheckRhodecodeHook(object):

    @patch('os.path.exists', Mock(return_value=False))
    def test_returns_true_when_no_hook_found(self):
        result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
        assert result

    @pytest.mark.parametrize("file_content, expected_result", [
        ("RC_HOOK_VER = '3.3.3'\n", True),
        ("RC_HOOK = '3.3.3'\n", False),
    ])
    @patch('os.path.exists', Mock(return_value=True))
    def test_signatures(self, file_content, expected_result):
        hook_content_patcher = patch.object(
            scm, '_read_hook', return_value=file_content)
        with hook_content_patcher:
            result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')

        assert result is expected_result


class TestInstallHooks(object):
    def test_hooks_are_installed_for_git_repo(self, backend_git):
        repo = backend_git.create_repo()
        model = scm.ScmModel()
        scm_repo = repo.scm_instance()
        with patch.object(model, 'install_git_hook') as hooks_mock:
            model.install_hooks(scm_repo, repo_type='git')
        hooks_mock.assert_called_once_with(scm_repo)

    def test_hooks_are_installed_for_svn_repo(self, backend_svn):
        repo = backend_svn.create_repo()
        scm_repo = repo.scm_instance()
        model = scm.ScmModel()
        with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock:
            model.install_hooks(scm_repo, repo_type='svn')
        hooks_mock.assert_called_once_with(scm_repo)

    @pytest.mark.parametrize('hook_method', [
        'install_svn_hooks',
        'install_git_hook'])
    def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method):
        repo = backend_hg.create_repo()
        scm_repo = repo.scm_instance()
        model = scm.ScmModel()
        with patch.object(scm.ScmModel, hook_method) as hooks_mock:
            model.install_hooks(scm_repo, repo_type='hg')
        assert hooks_mock.call_count == 0