test_repository.py
630 lines
| 22.3 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import datetime | ||||
r4914 | from urllib.error import URLError | |||
r1 | ||||
import mock | ||||
import pytest | ||||
from rhodecode.lib.vcs import backends | ||||
from rhodecode.lib.vcs.backends.base import ( | ||||
r5177 | Config, | |||
BaseInMemoryCommit, | ||||
Reference, | ||||
MergeResponse, | ||||
MergeFailureReason, | ||||
) | ||||
r1 | from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError | |||
from rhodecode.lib.vcs.nodes import FileNode | ||||
r2453 | from rhodecode.tests.vcs.conftest import BackendTestMixin | |||
r2810 | from rhodecode.tests import repo_id_generator | |||
r1 | ||||
r2453 | @pytest.mark.usefixtures("vcs_repository_support") | |||
r1 | class TestRepositoryBase(BackendTestMixin): | |||
recreate_repo_per_test = False | ||||
def test_init_accepts_unicode_path(self, tmpdir): | ||||
r5177 | path = str(tmpdir.join("unicode ä")) | |||
r1 | self.Backend(path, create=True) | |||
def test_init_accepts_str_path(self, tmpdir): | ||||
r5177 | path = str(tmpdir.join("str ä")) | |||
r1 | self.Backend(path, create=True) | |||
def test_init_fails_if_path_does_not_exist(self, tmpdir): | ||||
r5177 | path = str(tmpdir.join("i-do-not-exist")) | |||
r1 | with pytest.raises(VCSError): | |||
self.Backend(path) | ||||
def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir): | ||||
r5177 | path = str(tmpdir.mkdir("unicode ä")) | |||
r1 | with pytest.raises(VCSError): | |||
self.Backend(path) | ||||
def test_has_commits_attribute(self): | ||||
r5087 | assert self.repo.commit_ids | |||
r1 | ||||
def test_name(self): | ||||
r5177 | assert self.repo.name.startswith("vcs-test") | |||
r1 | ||||
@pytest.mark.backends("hg", "git") | ||||
def test_has_default_branch_name(self): | ||||
assert self.repo.DEFAULT_BRANCH_NAME is not None | ||||
@pytest.mark.backends("svn") | ||||
def test_has_no_default_branch_name(self): | ||||
assert self.repo.DEFAULT_BRANCH_NAME is None | ||||
def test_has_empty_commit(self): | ||||
assert self.repo.EMPTY_COMMIT_ID is not None | ||||
assert self.repo.EMPTY_COMMIT is not None | ||||
def test_empty_changeset_is_deprecated(self): | ||||
def get_empty_changeset(repo): | ||||
return repo.EMPTY_CHANGESET | ||||
r5177 | ||||
r1 | pytest.deprecated_call(get_empty_changeset, self.repo) | |||
def test_bookmarks(self): | ||||
assert len(self.repo.bookmarks) == 0 | ||||
r5087 | def test_check_url_on_path(self): | |||
r1 | config = Config() | |||
assert self.Backend.check_url(self.repo.path, config) | ||||
r5087 | def test_check_url_on_remote_url(self): | |||
config = Config() | ||||
url = { | ||||
r5177 | "hg": "https://code.rhodecode.com/rhodecode-vcsserver", | |||
r5184 | "svn": "https://default:default@code.rhodecode.com/svn-doc", | |||
r5177 | "git": "https://code.rhodecode.com/appenlight", | |||
r5087 | }[self.repo.alias] | |||
assert self.Backend.check_url(url, config) | ||||
r1 | def test_check_url_invalid(self): | |||
config = Config() | ||||
with pytest.raises(URLError): | ||||
self.Backend.check_url(self.repo.path + "invalid", config) | ||||
def test_get_contact(self): | ||||
r1351 | assert self.repo.contact | |||
r1 | ||||
def test_get_description(self): | ||||
r1351 | assert self.repo.description | |||
r1 | ||||
def test_get_hook_location(self): | ||||
assert len(self.repo.get_hook_location()) != 0 | ||||
r1351 | def test_last_change(self, local_dt_to_utc): | |||
r5607 | assert self.repo.last_change >= local_dt_to_utc(datetime.datetime(2010, 1, 1, 21, 0)) | |||
r1 | ||||
r1351 | def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc): | |||
r1 | delta = datetime.timedelta(seconds=1) | |||
r1351 | ||||
start = local_dt_to_utc(datetime.datetime.now()) | ||||
r1 | empty_repo = vcsbackend.create_repo() | |||
r1351 | now = local_dt_to_utc(datetime.datetime.now()) | |||
r1 | assert empty_repo.last_change >= start - delta | |||
assert empty_repo.last_change <= now + delta | ||||
def test_repo_equality(self): | ||||
assert self.repo == self.repo | ||||
def test_repo_equality_broken_object(self): | ||||
import copy | ||||
r5177 | ||||
r1 | _repo = copy.copy(self.repo) | |||
r5177 | delattr(_repo, "path") | |||
r1 | assert self.repo != _repo | |||
def test_repo_equality_other_object(self): | ||||
class dummy(object): | ||||
path = self.repo.path | ||||
r5177 | ||||
r1 | assert self.repo != dummy() | |||
def test_get_commit_is_implemented(self): | ||||
self.repo.get_commit() | ||||
def test_get_commits_is_implemented(self): | ||||
commit_iter = iter(self.repo.get_commits()) | ||||
commit = next(commit_iter) | ||||
assert commit.idx == 0 | ||||
def test_supports_iteration(self): | ||||
repo_iter = iter(self.repo) | ||||
commit = next(repo_iter) | ||||
assert commit.idx == 0 | ||||
def test_in_memory_commit(self): | ||||
imc = self.repo.in_memory_commit | ||||
assert isinstance(imc, BaseInMemoryCommit) | ||||
@pytest.mark.backends("hg") | ||||
def test__get_url_unicode(self): | ||||
r5177 | url = "/home/repos/malmö" | |||
r1 | assert self.repo._get_url(url) | |||
r2453 | @pytest.mark.usefixtures("vcs_repository_support") | |||
r1 | class TestDeprecatedRepositoryAPI(BackendTestMixin): | |||
recreate_repo_per_test = False | ||||
def test_revisions_is_deprecated(self): | ||||
def get_revisions(repo): | ||||
return repo.revisions | ||||
r5177 | ||||
r1 | pytest.deprecated_call(get_revisions, self.repo) | |||
def test_get_changeset_is_deprecated(self): | ||||
pytest.deprecated_call(self.repo.get_changeset) | ||||
def test_get_changesets_is_deprecated(self): | ||||
pytest.deprecated_call(self.repo.get_changesets) | ||||
def test_in_memory_changeset_is_deprecated(self): | ||||
def get_imc(repo): | ||||
return repo.in_memory_changeset | ||||
r5177 | ||||
r1 | pytest.deprecated_call(get_imc, self.repo) | |||
# TODO: these tests are incomplete, must check the resulting compare result for | ||||
# correcteness | ||||
class TestRepositoryCompare: | ||||
r5177 | @pytest.mark.parametrize("merge", [True, False]) | |||
r1 | def test_compare_commits_of_same_repository(self, vcsbackend, merge): | |||
target_repo = vcsbackend.create_repo(number_of_commits=5) | ||||
r5607 | target_repo.compare(target_repo[1].raw_id, target_repo[3].raw_id, target_repo, merge=merge) | |||
r1 | ||||
r5177 | @pytest.mark.xfail_backends("svn") | |||
@pytest.mark.parametrize("merge", [True, False]) | ||||
r1 | def test_compare_cloned_repositories(self, vcsbackend, merge): | |||
target_repo = vcsbackend.create_repo(number_of_commits=5) | ||||
source_repo = vcsbackend.clone_repo(target_repo) | ||||
assert target_repo != source_repo | ||||
r5177 | vcsbackend.add_file(source_repo, b"newfile", b"somecontent") | |||
r1 | source_commit = source_repo.get_commit() | |||
r5607 | target_repo.compare(target_repo[1].raw_id, source_repo[3].raw_id, source_repo, merge=merge) | |||
r1 | ||||
r5177 | @pytest.mark.xfail_backends("svn") | |||
@pytest.mark.parametrize("merge", [True, False]) | ||||
r1 | def test_compare_unrelated_repositories(self, vcsbackend, merge): | |||
orig = vcsbackend.create_repo(number_of_commits=5) | ||||
unrelated = vcsbackend.create_repo(number_of_commits=5) | ||||
assert orig != unrelated | ||||
r5177 | orig.compare(orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge) | |||
r1 | ||||
class TestRepositoryGetCommonAncestor: | ||||
def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend): | ||||
target_repo = vcsbackend.create_repo(number_of_commits=5) | ||||
expected_ancestor = target_repo[2].raw_id | ||||
r5177 | assert ( | |||
target_repo.get_common_ancestor( | ||||
commit_id1=target_repo[2].raw_id, | ||||
commit_id2=target_repo[4].raw_id, | ||||
repo2=target_repo, | ||||
) | ||||
== expected_ancestor | ||||
) | ||||
r1 | ||||
r5177 | assert ( | |||
target_repo.get_common_ancestor( | ||||
commit_id1=target_repo[4].raw_id, | ||||
commit_id2=target_repo[2].raw_id, | ||||
repo2=target_repo, | ||||
) | ||||
== expected_ancestor | ||||
) | ||||
r1 | ||||
@pytest.mark.xfail_backends("svn") | ||||
def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend): | ||||
target_repo = vcsbackend.create_repo(number_of_commits=5) | ||||
source_repo = vcsbackend.clone_repo(target_repo) | ||||
assert target_repo != source_repo | ||||
r5177 | vcsbackend.add_file(source_repo, b"newfile", b"somecontent") | |||
r1 | source_commit = source_repo.get_commit() | |||
expected_ancestor = target_repo[4].raw_id | ||||
r5177 | assert ( | |||
target_repo.get_common_ancestor( | ||||
commit_id1=target_repo[4].raw_id, | ||||
commit_id2=source_commit.raw_id, | ||||
repo2=source_repo, | ||||
) | ||||
== expected_ancestor | ||||
) | ||||
r1 | ||||
r5177 | assert ( | |||
target_repo.get_common_ancestor( | ||||
commit_id1=source_commit.raw_id, | ||||
commit_id2=target_repo[4].raw_id, | ||||
repo2=target_repo, | ||||
) | ||||
== expected_ancestor | ||||
) | ||||
r1 | ||||
@pytest.mark.xfail_backends("svn") | ||||
def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend): | ||||
original = vcsbackend.create_repo(number_of_commits=5) | ||||
unrelated = vcsbackend.create_repo(number_of_commits=5) | ||||
assert original != unrelated | ||||
r5177 | assert ( | |||
original.get_common_ancestor( | ||||
commit_id1=original[0].raw_id, | ||||
commit_id2=unrelated[0].raw_id, | ||||
repo2=unrelated, | ||||
) | ||||
is None | ||||
) | ||||
r1 | ||||
r5177 | assert ( | |||
original.get_common_ancestor( | ||||
commit_id1=original[-1].raw_id, | ||||
commit_id2=unrelated[-1].raw_id, | ||||
repo2=unrelated, | ||||
) | ||||
is None | ||||
) | ||||
r1 | ||||
@pytest.mark.backends("git", "hg") | ||||
r2810 | class TestRepositoryMerge(object): | |||
r1 | def prepare_for_success(self, vcsbackend): | |||
self.target_repo = vcsbackend.create_repo(number_of_commits=1) | ||||
self.source_repo = vcsbackend.clone_repo(self.target_repo) | ||||
r5177 | vcsbackend.add_file(self.target_repo, b"README_MERGE1", b"Version 1") | |||
vcsbackend.add_file(self.source_repo, b"README_MERGE2", b"Version 2") | ||||
r1 | imc = self.source_repo.in_memory_commit | |||
r5177 | imc.add(FileNode(b"file_x", content=self.source_repo.name)) | |||
r1 | imc.commit( | |||
r5177 | message="Automatic commit from repo merge test", | |||
author="Automatic <automatic@rhodecode.com>", | ||||
) | ||||
r1 | self.target_commit = self.target_repo.get_commit() | |||
self.source_commit = self.source_repo.get_commit() | ||||
# This only works for Git and Mercurial | ||||
default_branch = self.target_repo.DEFAULT_BRANCH_NAME | ||||
r5177 | self.target_ref = Reference("branch", default_branch, self.target_commit.raw_id) | |||
self.source_ref = Reference("branch", default_branch, self.source_commit.raw_id) | ||||
self.workspace_id = "test-merge-{}".format(vcsbackend.alias) | ||||
r2810 | self.repo_id = repo_id_generator(self.target_repo.path) | |||
r1 | ||||
def prepare_for_conflict(self, vcsbackend): | ||||
self.target_repo = vcsbackend.create_repo(number_of_commits=1) | ||||
self.source_repo = vcsbackend.clone_repo(self.target_repo) | ||||
r5177 | vcsbackend.add_file(self.target_repo, b"README_MERGE", b"Version 1") | |||
vcsbackend.add_file(self.source_repo, b"README_MERGE", b"Version 2") | ||||
r1 | self.target_commit = self.target_repo.get_commit() | |||
self.source_commit = self.source_repo.get_commit() | ||||
# This only works for Git and Mercurial | ||||
default_branch = self.target_repo.DEFAULT_BRANCH_NAME | ||||
r5177 | self.target_ref = Reference("branch", default_branch, self.target_commit.raw_id) | |||
self.source_ref = Reference("branch", default_branch, self.source_commit.raw_id) | ||||
self.workspace_id = "test-merge-{}".format(vcsbackend.alias) | ||||
r2810 | self.repo_id = repo_id_generator(self.target_repo.path) | |||
r1 | ||||
def test_merge_success(self, vcsbackend): | ||||
self.prepare_for_success(vcsbackend) | ||||
merge_response = self.target_repo.merge( | ||||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
r2810 | self.source_ref, | |||
r5177 | "test user", | |||
"test@rhodecode.com", | ||||
"merge message 1", | ||||
dry_run=False, | ||||
) | ||||
r5607 | expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE) | |||
r1 | assert merge_response == expected_merge_response | |||
r5177 | target_repo = backends.get_backend(vcsbackend.alias)(self.target_repo.path) | |||
r1 | target_commits = list(target_repo.get_commits()) | |||
commit_ids = [c.raw_id for c in target_commits[:-1]] | ||||
assert self.source_ref.commit_id in commit_ids | ||||
assert self.target_ref.commit_id in commit_ids | ||||
merge_commit = target_commits[-1] | ||||
Martin Bornhold
|
r1059 | assert merge_commit.raw_id == merge_response.merge_ref.commit_id | ||
r5177 | assert merge_commit.message.strip() == "merge message 1" | |||
assert merge_commit.author == "test user <test@rhodecode.com>" | ||||
r1 | ||||
# We call it twice so to make sure we can handle updates | ||||
target_ref = Reference( | ||||
r5177 | self.target_ref.type, | |||
self.target_ref.name, | ||||
merge_response.merge_ref.commit_id, | ||||
) | ||||
r1 | ||||
merge_response = target_repo.merge( | ||||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
target_ref, | ||||
self.source_repo, | ||||
self.source_ref, | ||||
"test user", | ||||
"test@rhodecode.com", | ||||
"merge message 2", | ||||
dry_run=False, | ||||
) | ||||
r5607 | expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE) | |||
r1 | assert merge_response == expected_merge_response | |||
r5177 | target_repo = backends.get_backend(vcsbackend.alias)(self.target_repo.path) | |||
merge_commit = target_repo.get_commit(merge_response.merge_ref.commit_id) | ||||
assert merge_commit.message.strip() == "merge message 1" | ||||
assert merge_commit.author == "test user <test@rhodecode.com>" | ||||
r1 | ||||
def test_merge_success_dry_run(self, vcsbackend): | ||||
self.prepare_for_success(vcsbackend) | ||||
merge_response = self.target_repo.merge( | ||||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
self.source_ref, | ||||
dry_run=True, | ||||
) | ||||
r1 | ||||
# We call it twice so to make sure we can handle updates | ||||
Martin Bornhold
|
r1059 | merge_response_update = self.target_repo.merge( | ||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
self.source_ref, | ||||
dry_run=True, | ||||
) | ||||
assert merge_response.merge_ref | ||||
assert merge_response_update.merge_ref | ||||
Martin Bornhold
|
r1059 | |||
r5175 | # Multiple merges may differ in their commit id. Therefore, we set the | |||
Martin Bornhold
|
r1060 | # commit id to `None` before comparing the merge responses. | ||
r5607 | merge_response.merge_ref.commit_id = "abcdeabcde" | |||
r3339 | ||||
r5607 | merge_response_update.merge_ref.commit_id = "abcdeabcde" | |||
Martin Bornhold
|
r1060 | |||
Martin Bornhold
|
r1059 | assert merge_response == merge_response_update | ||
assert merge_response.possible is True | ||||
assert merge_response.executed is False | ||||
assert merge_response.merge_ref | ||||
assert merge_response.failure_reason is MergeFailureReason.NONE | ||||
r1 | ||||
r5177 | @pytest.mark.parametrize("dry_run", [True, False]) | |||
r1 | def test_merge_conflict(self, vcsbackend, dry_run): | |||
self.prepare_for_conflict(vcsbackend) | ||||
r3339 | ||||
r5607 | expected_merge_response = MergeResponse(False, False, None, MergeFailureReason.MERGE_FAILED) | |||
r1 | ||||
merge_response = self.target_repo.merge( | ||||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
self.source_ref, | ||||
"test_user", | ||||
"test@rhodecode.com", | ||||
"test message", | ||||
dry_run=dry_run, | ||||
) | ||||
r1 | assert merge_response == expected_merge_response | |||
# We call it twice so to make sure we can handle updates | ||||
merge_response = self.target_repo.merge( | ||||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
r2810 | self.source_ref, | |||
r5177 | "test_user", | |||
"test@rhodecode.com", | ||||
"test message", | ||||
dry_run=dry_run, | ||||
) | ||||
r1 | assert merge_response == expected_merge_response | |||
def test_merge_target_is_not_head(self, vcsbackend): | ||||
self.prepare_for_success(vcsbackend) | ||||
r5177 | target_ref = Reference(self.target_ref.type, self.target_ref.name, "0" * 40) | |||
r3339 | expected_merge_response = MergeResponse( | |||
r5177 | False, | |||
False, | ||||
None, | ||||
MergeFailureReason.TARGET_IS_NOT_HEAD, | ||||
metadata={"target_ref": target_ref}, | ||||
) | ||||
r1 | merge_response = self.target_repo.merge( | |||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
target_ref, | ||||
self.source_repo, | ||||
self.source_ref, | ||||
dry_run=True, | ||||
) | ||||
r1 | ||||
assert merge_response == expected_merge_response | ||||
Martin Bornhold
|
r1080 | def test_merge_missing_source_reference(self, vcsbackend): | ||
r1 | self.prepare_for_success(vcsbackend) | |||
r5607 | source_ref = Reference(self.source_ref.type, "not_existing", self.source_ref.commit_id) | |||
r3339 | expected_merge_response = MergeResponse( | |||
r5177 | False, | |||
False, | ||||
None, | ||||
MergeFailureReason.MISSING_SOURCE_REF, | ||||
metadata={"source_ref": source_ref}, | ||||
) | ||||
r1 | ||||
merge_response = self.target_repo.merge( | ||||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
source_ref, | ||||
dry_run=True, | ||||
) | ||||
r1 | ||||
assert merge_response == expected_merge_response | ||||
def test_merge_raises_exception(self, vcsbackend): | ||||
self.prepare_for_success(vcsbackend) | ||||
expected_merge_response = MergeResponse( | ||||
r5177 | False, | |||
False, | ||||
None, | ||||
MergeFailureReason.UNKNOWN, | ||||
metadata={"exception": "ErrorForTest"}, | ||||
) | ||||
r1 | ||||
r5607 | with mock.patch.object(self.target_repo, "_merge_repo", side_effect=RepositoryError()): | |||
r1 | merge_response = self.target_repo.merge( | |||
r5177 | self.repo_id, | |||
self.workspace_id, | ||||
self.target_ref, | ||||
self.source_repo, | ||||
self.source_ref, | ||||
dry_run=True, | ||||
) | ||||
r1 | ||||
assert merge_response == expected_merge_response | ||||
def test_merge_invalid_user_name(self, vcsbackend): | ||||
repo = vcsbackend.create_repo(number_of_commits=1) | ||||
r5177 | ref = Reference("branch", "master", "not_used") | |||
workspace_id = "test-errors-in-merge" | ||||
r2810 | repo_id = repo_id_generator(workspace_id) | |||
r1 | with pytest.raises(ValueError): | |||
r5177 | repo.merge(repo_id, workspace_id, ref, self, ref) | |||
r1 | ||||
def test_merge_invalid_user_email(self, vcsbackend): | ||||
repo = vcsbackend.create_repo(number_of_commits=1) | ||||
r5177 | ref = Reference("branch", "master", "not_used") | |||
workspace_id = "test-errors-in-merge" | ||||
repo_id = repo_id_generator(workspace_id) | ||||
with pytest.raises(ValueError): | ||||
repo.merge(repo_id, workspace_id, ref, self, ref, "user name") | ||||
def test_merge_invalid_message(self, vcsbackend): | ||||
repo = vcsbackend.create_repo(number_of_commits=1) | ||||
ref = Reference("branch", "master", "not_used") | ||||
workspace_id = "test-errors-in-merge" | ||||
r2810 | repo_id = repo_id_generator(workspace_id) | |||
r1 | with pytest.raises(ValueError): | |||
r5607 | repo.merge(repo_id, workspace_id, ref, self, ref, "user name", "user@email.com") | |||
r1 | ||||
r2453 | @pytest.mark.usefixtures("vcs_repository_support") | |||
r1 | class TestRepositoryStrip(BackendTestMixin): | |||
recreate_repo_per_test = True | ||||
@classmethod | ||||
def _get_commits(cls): | ||||
commits = [ | ||||
{ | ||||
r5177 | "message": "Initial commit", | |||
"author": "Joe Doe <joe.doe@example.com>", | ||||
"date": datetime.datetime(2010, 1, 1, 20), | ||||
"branch": "master", | ||||
"added": [ | ||||
FileNode(b"foobar", content="foobar"), | ||||
FileNode(b"foobar2", content="foobar2"), | ||||
r1 | ], | |||
}, | ||||
] | ||||
r4906 | for x in range(10): | |||
r1 | commit_data = { | |||
r5177 | "message": "Changed foobar - commit%s" % x, | |||
"author": "Jane Doe <jane.doe@example.com>", | ||||
"date": datetime.datetime(2010, 1, 1, 21, x), | ||||
"branch": "master", | ||||
"changed": [ | ||||
FileNode(b"foobar", "FOOBAR - %s" % x), | ||||
r1 | ], | |||
} | ||||
commits.append(commit_data) | ||||
return commits | ||||
@pytest.mark.backends("git", "hg") | ||||
def test_strip_commit(self): | ||||
tip = self.repo.get_commit() | ||||
assert tip.idx == 10 | ||||
self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME) | ||||
tip = self.repo.get_commit() | ||||
assert tip.idx == 9 | ||||
@pytest.mark.backends("git", "hg") | ||||
def test_strip_multiple_commits(self): | ||||
tip = self.repo.get_commit() | ||||
assert tip.idx == 10 | ||||
old = self.repo.get_commit(commit_idx=5) | ||||
self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME) | ||||
tip = self.repo.get_commit() | ||||
assert tip.idx == 4 | ||||
r5177 | @pytest.mark.backends("hg", "git") | |||
r2810 | class TestRepositoryPull(object): | |||
r1 | def test_pull(self, vcsbackend): | |||
source_repo = vcsbackend.repo | ||||
target_repo = vcsbackend.create_repo() | ||||
assert len(source_repo.commit_ids) > len(target_repo.commit_ids) | ||||
target_repo.pull(source_repo.path) | ||||
# Note: Get a fresh instance, avoids caching trouble | ||||
target_repo = vcsbackend.backend(target_repo.path) | ||||
assert len(source_repo.commit_ids) == len(target_repo.commit_ids) | ||||
def test_pull_wrong_path(self, vcsbackend): | ||||
target_repo = vcsbackend.create_repo() | ||||
with pytest.raises(RepositoryError): | ||||
target_repo.pull(target_repo.path + "wrong") | ||||
def test_pull_specific_commits(self, vcsbackend): | ||||
source_repo = vcsbackend.repo | ||||
target_repo = vcsbackend.create_repo() | ||||
second_commit = source_repo[1].raw_id | ||||
r5177 | if vcsbackend.alias == "git": | |||
second_commit_ref = "refs/test-refs/a" | ||||
r1 | source_repo.set_refs(second_commit_ref, second_commit) | |||
target_repo.pull(source_repo.path, commit_ids=[second_commit]) | ||||
target_repo = vcsbackend.backend(target_repo.path) | ||||
assert 2 == len(target_repo.commit_ids) | ||||
assert second_commit == target_repo.get_commit().raw_id | ||||