# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/

"""
Tests so called "in memory commits" commit API of vcs.
"""
import datetime

import pytest

from rhodecode.lib.str_utils import safe_bytes, safe_str
from rhodecode.lib.vcs.exceptions import (
    EmptyRepositoryError, NodeAlreadyAddedError, NodeAlreadyExistsError,
    NodeAlreadyRemovedError, NodeAlreadyChangedError, NodeDoesNotExistError,
    NodeNotChangedError)
from rhodecode.lib.vcs.nodes import DirNode, FileNode
from rhodecode.tests.vcs.conftest import BackendTestMixin


@pytest.fixture()
def nodes():
    nodes = [
        FileNode(b'foobar', content=b'Foo & bar'),
        FileNode(b'foobar2', content=b'Foo & bar, doubled!'),
        FileNode(b'foo bar with spaces', content=b''),
        FileNode(b'foo/bar/baz', content=b'Inside'),
        FileNode(b'foo/bar/file.bin', content=(
                b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00'
                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe'
                b'\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
                b'\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00'
                b'\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00'
                b'\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff'
                b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
            )
        ),
    ]
    return nodes


@pytest.mark.usefixtures("vcs_repository_support")
class TestInMemoryCommit(BackendTestMixin):
    """
    This is a backend independent test case class which should be created
    with ``type`` method.

    It is required to set following attributes at subclass:

    - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
    """

    @classmethod
    def _get_commits(cls):
        return []

    def test_add(self, nodes):
        for node in nodes:
            self.imc.add(node)

        self.commit()
        self.assert_successful_commit(nodes)

    @pytest.mark.backends("hg")
    def test_add_on_branch_hg(self, nodes):
        for node in nodes:
            self.imc.add(node)
        self.commit(branch='stable')
        self.assert_successful_commit(nodes)

    @pytest.mark.backends("git")
    def test_add_on_branch_git(self, nodes):
        for node in nodes:
            self.imc.add(node)
        self.commit(branch='stable')
        self.assert_successful_commit(nodes)

    def test_add_in_bulk(self, nodes):
        self.imc.add(*nodes)

        self.commit()
        self.assert_successful_commit(nodes)

    def test_add_non_ascii_files(self):
        nodes = [
            FileNode(safe_bytes('żółwik/zwierzątko_utf8_str'),
                     content=safe_bytes('ćććć')),
            FileNode(safe_bytes('żółwik/zwierzątko_unicode'),
                     content=safe_bytes('ćććć')),
        ]

        for node in nodes:
            self.imc.add(node)

        self.commit()
        self.assert_successful_commit(nodes)

    def commit(self, branch=None):
        self.old_commit_count = len(self.repo.commit_ids)
        self.commit_message = 'Test commit with unicode: żółwik'
        self.commit_author = f'{self.__class__.__name__} <foo@email.com>'
        self.commit = self.imc.commit(
            message=self.commit_message, author=self.commit_author,
            branch=branch)

    def test_add_actually_adds_all_nodes_at_second_commit_too(self):
        to_add = [
            FileNode(b'foo/bar/image.png', content=b'\0'),
            FileNode(b'foo/README.txt', content=b'readme!'),
        ]
        self.imc.add(*to_add)
        commit = self.imc.commit('Initial', 'joe doe <joe.doe@example.com>')
        assert isinstance(commit.get_node('foo'), DirNode)
        assert isinstance(commit.get_node('foo/bar'), DirNode)
        self.assert_nodes_in_commit(commit, to_add)

        # commit some more files again
        to_add = [
            FileNode(b'foo/bar/foobaz/bar', content=b'foo'),
            FileNode(b'foo/bar/another/bar', content=b'foo'),
            FileNode(b'foo/baz.txt', content=b'foo'),
            FileNode(b'foobar/foobaz/file', content=b'foo'),
            FileNode(b'foobar/barbaz', content=b'foo'),
        ]
        self.imc.add(*to_add)
        commit = self.imc.commit('Another', 'joe doe <joe.doe@example.com>')
        self.assert_nodes_in_commit(commit, to_add)

    def test_add_raise_already_added(self):
        node = FileNode(b'foobar', content=b'baz')
        self.imc.add(node)
        with pytest.raises(NodeAlreadyAddedError):
            self.imc.add(node)

    def test_check_integrity_raise_already_exist(self):
        node = FileNode(b'foobar', content=b'baz')
        self.imc.add(node)
        self.imc.commit(message='Added foobar',
                        author='Some Name <foo@bar.com>')
        self.imc.add(node)
        with pytest.raises(NodeAlreadyExistsError):
            self.imc.commit(message='new message',
                            author='Some Name <foo@bar.com>')

    def test_change(self):
        self.imc.add(FileNode(b'foo/bar/baz', content=b'foo'))
        self.imc.add(FileNode(b'foo/fbar', content=b'foobar'))
        tip = self.imc.commit('Initial', 'joe doe <joe.doe@example.com>')

        # Change node's content
        node = FileNode(b'foo/bar/baz', content=b'My **changed** content')
        self.imc.change(node)
        self.imc.commit('Changed %s' % node.path, 'joe doe <joe.doe@example.com>')

        newtip = self.repo.get_commit()
        assert tip != newtip
        assert tip.id != newtip.id
        self.assert_nodes_in_commit(newtip, (node,))

    def test_change_non_ascii(self):
        to_add = [
            FileNode(safe_bytes('żółwik/zwierzątko'),
                     content=safe_bytes('ćććć')),
            FileNode(safe_bytes('żółwik/zwierzątko_uni'),
                     content=safe_bytes('ćććć')),
        ]
        for node in to_add:
            self.imc.add(node)

        tip = self.imc.commit('Initial', 'joe doe <joe.doe@example.com>')

        # Change node's content
        node = FileNode(safe_bytes('żółwik/zwierzątko'),
                        content=b'My **changed** content')
        self.imc.change(node)
        self.imc.commit('Changed %s' % safe_str(node.path),
                        author='joe doe <joe.doe@example.com>')

        node_uni = FileNode(safe_bytes('żółwik/zwierzątko_uni'),
                            content=b'My **changed** content')
        self.imc.change(node_uni)
        self.imc.commit('Changed %s' % safe_str(node_uni.path),
                        author='joe doe <joe.doe@example.com>')

        newtip = self.repo.get_commit()
        assert tip != newtip
        assert tip.id != newtip.id

        self.assert_nodes_in_commit(newtip, (node, node_uni))

    def test_change_raise_empty_repository(self):
        node = FileNode(b'foobar')
        with pytest.raises(EmptyRepositoryError):
            self.imc.change(node)

    def test_check_integrity_change_raise_node_does_not_exist(self):
        node = FileNode(b'foobar', content=b'baz')
        self.imc.add(node)
        self.imc.commit(message='Added foobar', author='Some Name <foo@bar.com>')
        node = FileNode(b'not-foobar', content=b'')
        self.imc.change(node)
        with pytest.raises(NodeDoesNotExistError):
            self.imc.commit(message='Changed not existing node', author='Some Name <foo@bar.com>')

    def test_change_raise_node_already_changed(self):
        node = FileNode(b'foobar', content=b'baz')
        self.imc.add(node)
        self.imc.commit(message='Added foobar', author='Some Nam <foo@bar.com>')
        node = FileNode(b'foobar', content=b'more baz')
        self.imc.change(node)
        with pytest.raises(NodeAlreadyChangedError):
            self.imc.change(node)

    def test_check_integrity_change_raise_node_not_changed(self, nodes):
        self.test_add(nodes)  # Performs first commit

        node = FileNode(nodes[0].bytes_path, content=nodes[0].content)
        self.imc.change(node)
        with pytest.raises(NodeNotChangedError):
            self.imc.commit(
                message='Trying to mark node as changed without touching it',
                author='Some Name <foo@bar.com>')

    def test_change_raise_node_already_removed(self):
        node = FileNode(b'foobar', content=b'baz')
        self.imc.add(node)
        self.imc.commit(message='Added foobar', author='Some Name <foo@bar.com>')
        self.imc.remove(FileNode(b'foobar'))
        with pytest.raises(NodeAlreadyRemovedError):
            self.imc.change(node)

    def test_remove(self, nodes):
        self.test_add(nodes)  # Performs first commit

        tip = self.repo.get_commit()
        node = nodes[0]
        assert node.content == tip.get_node(node.path).content
        self.imc.remove(node)
        self.imc.commit(message=f'Removed {node.path}', author='Some Name <foo@bar.com>')

        newtip = self.repo.get_commit()
        assert tip != newtip
        assert tip.id != newtip.id
        with pytest.raises(NodeDoesNotExistError):
            newtip.get_node(node.path)

    def test_remove_last_file_from_directory(self):
        node = FileNode(b'omg/qwe/foo/bar', content=b'foobar')
        self.imc.add(node)
        self.imc.commit('added', author='joe doe <joe@doe.com>')

        self.imc.remove(node)
        tip = self.imc.commit('removed', 'joe doe <joe@doe.com>')
        with pytest.raises(NodeDoesNotExistError):
            tip.get_node('omg/qwe/foo/bar')

    def test_remove_raise_node_does_not_exist(self, nodes):
        self.imc.remove(nodes[0])
        with pytest.raises(NodeDoesNotExistError):
            self.imc.commit(
                message='Trying to remove node at empty repository',
                author='Some Name <foo@bar.com>')

    def test_check_integrity_remove_raise_node_does_not_exist(self, nodes):
        self.test_add(nodes)  # Performs first commit

        node = FileNode(b'no-such-file')
        self.imc.remove(node)
        with pytest.raises(NodeDoesNotExistError):
            self.imc.commit(
                message='Trying to remove not existing node',
                author='Some Name <foo@bar.com>')

    def test_remove_raise_node_already_removed(self, nodes):
        self.test_add(nodes)  # Performs first commit

        node = FileNode(nodes[0].bytes_path)
        self.imc.remove(node)
        with pytest.raises(NodeAlreadyRemovedError):
            self.imc.remove(node)

    def test_remove_raise_node_already_changed(self, nodes):
        self.test_add(nodes)  # Performs first commit

        node = FileNode(nodes[0].bytes_path, content=b'Bending time')
        self.imc.change(node)
        with pytest.raises(NodeAlreadyChangedError):
            self.imc.remove(node)

    def test_reset(self):
        self.imc.add(FileNode(b'foo', content=b'bar'))
        # self.imc.change(FileNode(b'baz', content='new'))
        # self.imc.remove(FileNode(b'qwe'))
        self.imc.reset()
        assert not any((self.imc.added, self.imc.changed, self.imc.removed))

    def test_multiple_commits(self):
        N = 3  # number of commits to perform
        last = None
        for x in range(N):
            fname = safe_bytes('file%s' % str(x).rjust(5, '0'))
            content = safe_bytes('foobar\n' * x)
            node = FileNode(fname, content=content)
            self.imc.add(node)
            commit = self.imc.commit("Commit no. %s" % (x + 1), author='Vcs User <foo@bar.com>')
            assert last != commit
            last = commit

        # Check commit number for same repo
        assert len(self.repo.commit_ids) == N

        # Check commit number for recreated repo
        repo = self.Backend(self.repo_path)
        assert len(repo.commit_ids) == N

    def test_date_attr(self, local_dt_to_utc):
        node = FileNode(b'foobar.txt', content=b'Foobared!')
        self.imc.add(node)
        date = datetime.datetime(1985, 1, 30, 1, 45)
        commit = self.imc.commit(
            "Committed at time when I was born ;-)",
            author='Test User <foo@bar.com>', date=date)

        assert commit.date == local_dt_to_utc(date)

    def assert_successful_commit(self, added_nodes):
        newtip = self.repo.get_commit()
        assert self.commit == newtip
        assert self.old_commit_count + 1 == len(self.repo.commit_ids)
        assert newtip.message == self.commit_message
        assert newtip.author == self.commit_author
        assert not any((self.imc.added, self.imc.changed, self.imc.removed))
        self.assert_nodes_in_commit(newtip, added_nodes)

    def assert_nodes_in_commit(self, commit, nodes):
        for node in nodes:
            assert commit.get_node(node.path).path == node.path
            assert commit.get_node(node.path).content == node.content