# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import datetime from urllib.error import URLError import mock import pytest from rhodecode.lib.vcs import backends from rhodecode.lib.vcs.backends.base import ( Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason, ) from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError from rhodecode.lib.vcs.nodes import FileNode from rhodecode.tests.vcs.conftest import BackendTestMixin from rhodecode.tests import repo_id_generator @pytest.mark.usefixtures("vcs_repository_support") class TestRepositoryBase(BackendTestMixin): recreate_repo_per_test = False def test_init_accepts_unicode_path(self, tmpdir): path = str(tmpdir.join("unicode ä")) self.Backend(path, create=True) def test_init_accepts_str_path(self, tmpdir): path = str(tmpdir.join("str ä")) self.Backend(path, create=True) def test_init_fails_if_path_does_not_exist(self, tmpdir): path = str(tmpdir.join("i-do-not-exist")) with pytest.raises(VCSError): self.Backend(path) def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir): path = str(tmpdir.mkdir("unicode ä")) with pytest.raises(VCSError): self.Backend(path) def test_has_commits_attribute(self): assert self.repo.commit_ids def test_name(self): assert self.repo.name.startswith("vcs-test") @pytest.mark.backends("hg", "git") def test_has_default_branch_name(self): assert self.repo.DEFAULT_BRANCH_NAME is not None @pytest.mark.backends("svn") def test_has_no_default_branch_name(self): assert self.repo.DEFAULT_BRANCH_NAME is None def test_has_empty_commit(self): assert self.repo.EMPTY_COMMIT_ID is not None assert self.repo.EMPTY_COMMIT is not None def test_empty_changeset_is_deprecated(self): def get_empty_changeset(repo): return repo.EMPTY_CHANGESET pytest.deprecated_call(get_empty_changeset, self.repo) def test_bookmarks(self): assert len(self.repo.bookmarks) == 0 def test_check_url_on_path(self): config = Config() assert self.Backend.check_url(self.repo.path, config) def test_check_url_on_remote_url(self): config = Config() url = { "hg": "https://code.rhodecode.com/rhodecode-vcsserver", "svn": "https://default:default@code.rhodecode.com/svn-doc", "git": "https://code.rhodecode.com/appenlight", }[self.repo.alias] assert self.Backend.check_url(url, config) def test_check_url_invalid(self): config = Config() with pytest.raises(URLError): self.Backend.check_url(self.repo.path + "invalid", config) def test_get_contact(self): assert self.repo.contact def test_get_description(self): assert self.repo.description def test_get_hook_location(self): assert len(self.repo.get_hook_location()) != 0 def test_last_change(self, local_dt_to_utc): assert self.repo.last_change >= local_dt_to_utc( datetime.datetime(2010, 1, 1, 21, 0) ) def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc): delta = datetime.timedelta(seconds=1) start = local_dt_to_utc(datetime.datetime.now()) empty_repo = vcsbackend.create_repo() now = local_dt_to_utc(datetime.datetime.now()) assert empty_repo.last_change >= start - delta assert empty_repo.last_change <= now + delta def test_repo_equality(self): assert self.repo == self.repo def test_repo_equality_broken_object(self): import copy _repo = copy.copy(self.repo) delattr(_repo, "path") assert self.repo != _repo def test_repo_equality_other_object(self): class dummy(object): path = self.repo.path assert self.repo != dummy() def test_get_commit_is_implemented(self): self.repo.get_commit() def test_get_commits_is_implemented(self): commit_iter = iter(self.repo.get_commits()) commit = next(commit_iter) assert commit.idx == 0 def test_supports_iteration(self): repo_iter = iter(self.repo) commit = next(repo_iter) assert commit.idx == 0 def test_in_memory_commit(self): imc = self.repo.in_memory_commit assert isinstance(imc, BaseInMemoryCommit) @pytest.mark.backends("hg") def test__get_url_unicode(self): url = "/home/repos/malmö" assert self.repo._get_url(url) @pytest.mark.usefixtures("vcs_repository_support") class TestDeprecatedRepositoryAPI(BackendTestMixin): recreate_repo_per_test = False def test_revisions_is_deprecated(self): def get_revisions(repo): return repo.revisions pytest.deprecated_call(get_revisions, self.repo) def test_get_changeset_is_deprecated(self): pytest.deprecated_call(self.repo.get_changeset) def test_get_changesets_is_deprecated(self): pytest.deprecated_call(self.repo.get_changesets) def test_in_memory_changeset_is_deprecated(self): def get_imc(repo): return repo.in_memory_changeset pytest.deprecated_call(get_imc, self.repo) # TODO: these tests are incomplete, must check the resulting compare result for # correcteness class TestRepositoryCompare: @pytest.mark.parametrize("merge", [True, False]) def test_compare_commits_of_same_repository(self, vcsbackend, merge): target_repo = vcsbackend.create_repo(number_of_commits=5) target_repo.compare( target_repo[1].raw_id, target_repo[3].raw_id, target_repo, merge=merge ) @pytest.mark.xfail_backends("svn") @pytest.mark.parametrize("merge", [True, False]) def test_compare_cloned_repositories(self, vcsbackend, merge): target_repo = vcsbackend.create_repo(number_of_commits=5) source_repo = vcsbackend.clone_repo(target_repo) assert target_repo != source_repo vcsbackend.add_file(source_repo, b"newfile", b"somecontent") source_commit = source_repo.get_commit() target_repo.compare( target_repo[1].raw_id, source_repo[3].raw_id, source_repo, merge=merge ) @pytest.mark.xfail_backends("svn") @pytest.mark.parametrize("merge", [True, False]) def test_compare_unrelated_repositories(self, vcsbackend, merge): orig = vcsbackend.create_repo(number_of_commits=5) unrelated = vcsbackend.create_repo(number_of_commits=5) assert orig != unrelated orig.compare(orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge) class TestRepositoryGetCommonAncestor: def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend): target_repo = vcsbackend.create_repo(number_of_commits=5) expected_ancestor = target_repo[2].raw_id assert ( target_repo.get_common_ancestor( commit_id1=target_repo[2].raw_id, commit_id2=target_repo[4].raw_id, repo2=target_repo, ) == expected_ancestor ) assert ( target_repo.get_common_ancestor( commit_id1=target_repo[4].raw_id, commit_id2=target_repo[2].raw_id, repo2=target_repo, ) == expected_ancestor ) @pytest.mark.xfail_backends("svn") def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend): target_repo = vcsbackend.create_repo(number_of_commits=5) source_repo = vcsbackend.clone_repo(target_repo) assert target_repo != source_repo vcsbackend.add_file(source_repo, b"newfile", b"somecontent") source_commit = source_repo.get_commit() expected_ancestor = target_repo[4].raw_id assert ( target_repo.get_common_ancestor( commit_id1=target_repo[4].raw_id, commit_id2=source_commit.raw_id, repo2=source_repo, ) == expected_ancestor ) assert ( target_repo.get_common_ancestor( commit_id1=source_commit.raw_id, commit_id2=target_repo[4].raw_id, repo2=target_repo, ) == expected_ancestor ) @pytest.mark.xfail_backends("svn") def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend): original = vcsbackend.create_repo(number_of_commits=5) unrelated = vcsbackend.create_repo(number_of_commits=5) assert original != unrelated assert ( original.get_common_ancestor( commit_id1=original[0].raw_id, commit_id2=unrelated[0].raw_id, repo2=unrelated, ) is None ) assert ( original.get_common_ancestor( commit_id1=original[-1].raw_id, commit_id2=unrelated[-1].raw_id, repo2=unrelated, ) is None ) @pytest.mark.backends("git", "hg") class TestRepositoryMerge(object): def prepare_for_success(self, vcsbackend): self.target_repo = vcsbackend.create_repo(number_of_commits=1) self.source_repo = vcsbackend.clone_repo(self.target_repo) vcsbackend.add_file(self.target_repo, b"README_MERGE1", b"Version 1") vcsbackend.add_file(self.source_repo, b"README_MERGE2", b"Version 2") imc = self.source_repo.in_memory_commit imc.add(FileNode(b"file_x", content=self.source_repo.name)) imc.commit( message="Automatic commit from repo merge test", author="Automatic ", ) self.target_commit = self.target_repo.get_commit() self.source_commit = self.source_repo.get_commit() # This only works for Git and Mercurial default_branch = self.target_repo.DEFAULT_BRANCH_NAME self.target_ref = Reference("branch", default_branch, self.target_commit.raw_id) self.source_ref = Reference("branch", default_branch, self.source_commit.raw_id) self.workspace_id = "test-merge-{}".format(vcsbackend.alias) self.repo_id = repo_id_generator(self.target_repo.path) def prepare_for_conflict(self, vcsbackend): self.target_repo = vcsbackend.create_repo(number_of_commits=1) self.source_repo = vcsbackend.clone_repo(self.target_repo) vcsbackend.add_file(self.target_repo, b"README_MERGE", b"Version 1") vcsbackend.add_file(self.source_repo, b"README_MERGE", b"Version 2") self.target_commit = self.target_repo.get_commit() self.source_commit = self.source_repo.get_commit() # This only works for Git and Mercurial default_branch = self.target_repo.DEFAULT_BRANCH_NAME self.target_ref = Reference("branch", default_branch, self.target_commit.raw_id) self.source_ref = Reference("branch", default_branch, self.source_commit.raw_id) self.workspace_id = "test-merge-{}".format(vcsbackend.alias) self.repo_id = repo_id_generator(self.target_repo.path) def test_merge_success(self, vcsbackend): self.prepare_for_success(vcsbackend) merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, self.source_ref, "test user", "test@rhodecode.com", "merge message 1", dry_run=False, ) expected_merge_response = MergeResponse( True, True, merge_response.merge_ref, MergeFailureReason.NONE ) assert merge_response == expected_merge_response target_repo = backends.get_backend(vcsbackend.alias)(self.target_repo.path) target_commits = list(target_repo.get_commits()) commit_ids = [c.raw_id for c in target_commits[:-1]] assert self.source_ref.commit_id in commit_ids assert self.target_ref.commit_id in commit_ids merge_commit = target_commits[-1] assert merge_commit.raw_id == merge_response.merge_ref.commit_id assert merge_commit.message.strip() == "merge message 1" assert merge_commit.author == "test user " # We call it twice so to make sure we can handle updates target_ref = Reference( self.target_ref.type, self.target_ref.name, merge_response.merge_ref.commit_id, ) merge_response = target_repo.merge( self.repo_id, self.workspace_id, target_ref, self.source_repo, self.source_ref, "test user", "test@rhodecode.com", "merge message 2", dry_run=False, ) expected_merge_response = MergeResponse( True, True, merge_response.merge_ref, MergeFailureReason.NONE ) assert merge_response == expected_merge_response target_repo = backends.get_backend(vcsbackend.alias)(self.target_repo.path) merge_commit = target_repo.get_commit(merge_response.merge_ref.commit_id) assert merge_commit.message.strip() == "merge message 1" assert merge_commit.author == "test user " def test_merge_success_dry_run(self, vcsbackend): self.prepare_for_success(vcsbackend) merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, self.source_ref, dry_run=True, ) # We call it twice so to make sure we can handle updates merge_response_update = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, self.source_ref, dry_run=True, ) assert merge_response.merge_ref assert merge_response_update.merge_ref # Multiple merges may differ in their commit id. Therefore, we set the # commit id to `None` before comparing the merge responses. merge_response.merge_ref.commit_id = 'abcdeabcde' merge_response_update.merge_ref.commit_id = 'abcdeabcde' assert merge_response == merge_response_update assert merge_response.possible is True assert merge_response.executed is False assert merge_response.merge_ref assert merge_response.failure_reason is MergeFailureReason.NONE @pytest.mark.parametrize("dry_run", [True, False]) def test_merge_conflict(self, vcsbackend, dry_run): self.prepare_for_conflict(vcsbackend) expected_merge_response = MergeResponse( False, False, None, MergeFailureReason.MERGE_FAILED ) merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, self.source_ref, "test_user", "test@rhodecode.com", "test message", dry_run=dry_run, ) assert merge_response == expected_merge_response # We call it twice so to make sure we can handle updates merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, self.source_ref, "test_user", "test@rhodecode.com", "test message", dry_run=dry_run, ) assert merge_response == expected_merge_response def test_merge_target_is_not_head(self, vcsbackend): self.prepare_for_success(vcsbackend) target_ref = Reference(self.target_ref.type, self.target_ref.name, "0" * 40) expected_merge_response = MergeResponse( False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD, metadata={"target_ref": target_ref}, ) merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, target_ref, self.source_repo, self.source_ref, dry_run=True, ) assert merge_response == expected_merge_response def test_merge_missing_source_reference(self, vcsbackend): self.prepare_for_success(vcsbackend) source_ref = Reference( self.source_ref.type, "not_existing", self.source_ref.commit_id ) expected_merge_response = MergeResponse( False, False, None, MergeFailureReason.MISSING_SOURCE_REF, metadata={"source_ref": source_ref}, ) merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, source_ref, dry_run=True, ) assert merge_response == expected_merge_response def test_merge_raises_exception(self, vcsbackend): self.prepare_for_success(vcsbackend) expected_merge_response = MergeResponse( False, False, None, MergeFailureReason.UNKNOWN, metadata={"exception": "ErrorForTest"}, ) with mock.patch.object( self.target_repo, "_merge_repo", side_effect=RepositoryError() ): merge_response = self.target_repo.merge( self.repo_id, self.workspace_id, self.target_ref, self.source_repo, self.source_ref, dry_run=True, ) assert merge_response == expected_merge_response def test_merge_invalid_user_name(self, vcsbackend): repo = vcsbackend.create_repo(number_of_commits=1) ref = Reference("branch", "master", "not_used") workspace_id = "test-errors-in-merge" repo_id = repo_id_generator(workspace_id) with pytest.raises(ValueError): repo.merge(repo_id, workspace_id, ref, self, ref) def test_merge_invalid_user_email(self, vcsbackend): repo = vcsbackend.create_repo(number_of_commits=1) ref = Reference("branch", "master", "not_used") workspace_id = "test-errors-in-merge" repo_id = repo_id_generator(workspace_id) with pytest.raises(ValueError): repo.merge(repo_id, workspace_id, ref, self, ref, "user name") def test_merge_invalid_message(self, vcsbackend): repo = vcsbackend.create_repo(number_of_commits=1) ref = Reference("branch", "master", "not_used") workspace_id = "test-errors-in-merge" repo_id = repo_id_generator(workspace_id) with pytest.raises(ValueError): repo.merge( repo_id, workspace_id, ref, self, ref, "user name", "user@email.com" ) @pytest.mark.usefixtures("vcs_repository_support") class TestRepositoryStrip(BackendTestMixin): recreate_repo_per_test = True @classmethod def _get_commits(cls): commits = [ { "message": "Initial commit", "author": "Joe Doe ", "date": datetime.datetime(2010, 1, 1, 20), "branch": "master", "added": [ FileNode(b"foobar", content="foobar"), FileNode(b"foobar2", content="foobar2"), ], }, ] for x in range(10): commit_data = { "message": "Changed foobar - commit%s" % x, "author": "Jane Doe ", "date": datetime.datetime(2010, 1, 1, 21, x), "branch": "master", "changed": [ FileNode(b"foobar", "FOOBAR - %s" % x), ], } commits.append(commit_data) return commits @pytest.mark.backends("git", "hg") def test_strip_commit(self): tip = self.repo.get_commit() assert tip.idx == 10 self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME) tip = self.repo.get_commit() assert tip.idx == 9 @pytest.mark.backends("git", "hg") def test_strip_multiple_commits(self): tip = self.repo.get_commit() assert tip.idx == 10 old = self.repo.get_commit(commit_idx=5) self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME) tip = self.repo.get_commit() assert tip.idx == 4 @pytest.mark.backends("hg", "git") class TestRepositoryPull(object): def test_pull(self, vcsbackend): source_repo = vcsbackend.repo target_repo = vcsbackend.create_repo() assert len(source_repo.commit_ids) > len(target_repo.commit_ids) target_repo.pull(source_repo.path) # Note: Get a fresh instance, avoids caching trouble target_repo = vcsbackend.backend(target_repo.path) assert len(source_repo.commit_ids) == len(target_repo.commit_ids) def test_pull_wrong_path(self, vcsbackend): target_repo = vcsbackend.create_repo() with pytest.raises(RepositoryError): target_repo.pull(target_repo.path + "wrong") def test_pull_specific_commits(self, vcsbackend): source_repo = vcsbackend.repo target_repo = vcsbackend.create_repo() second_commit = source_repo[1].raw_id if vcsbackend.alias == "git": second_commit_ref = "refs/test-refs/a" source_repo.set_refs(second_commit_ref, second_commit) target_repo.pull(source_repo.path, commit_ids=[second_commit]) target_repo = vcsbackend.backend(target_repo.path) assert 2 == len(target_repo.commit_ids) assert second_commit == target_repo.get_commit().raw_id