# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import mock
import pytest
from rhodecode.lib.str_utils import safe_bytes
from rhodecode.lib.utils import make_db_config
from rhodecode.lib.vcs import backends
from rhodecode.lib.vcs.backends.base import Reference, MergeResponse, MergeFailureReason
from rhodecode.lib.vcs.backends.hg import MercurialRepository, MercurialCommit
from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError, CommitDoesNotExistError
from rhodecode.lib.vcs.nodes import FileNode, NodeKind, DirNode, RootNode
from rhodecode.tests import TEST_HG_REPO, TEST_HG_REPO_CLONE, repo_id_generator
pytestmark = pytest.mark.backends("hg")
def repo_path_generator():
"""
Return a different path to be used for cloning repos.
"""
i = 0
while True:
i += 1
yield f"{TEST_HG_REPO_CLONE}-{i:d}"
REPO_PATH_GENERATOR = repo_path_generator()
class TestMercurialRepository:
# pylint: disable=protected-access
@pytest.fixture(autouse=True)
def prepare(self):
self.repo = MercurialRepository(TEST_HG_REPO)
def get_clone_repo(self):
"""
Return a clone of the base repo.
"""
clone_path = next(REPO_PATH_GENERATOR)
repo_clone = MercurialRepository(clone_path, create=True, src_url=self.repo.path)
return repo_clone
def get_empty_repo(self):
"""
Return an empty repo.
"""
return MercurialRepository(next(REPO_PATH_GENERATOR), create=True)
def test_wrong_repo_path(self):
wrong_repo_path = "/tmp/errorrepo_hg"
with pytest.raises(RepositoryError):
MercurialRepository(wrong_repo_path)
def test_unicode_path_repo(self):
with pytest.raises(VCSError):
MercurialRepository("iShouldFail")
def test_unicode_commit_id(self):
with pytest.raises(CommitDoesNotExistError):
self.repo.get_commit("unicode-commit-id")
with pytest.raises(CommitDoesNotExistError):
self.repo.get_commit("unícøde-spéçial-chäråcter-commit-id")
def test_unicode_bookmark(self):
self.repo.bookmark("unicode-bookmark")
self.repo.bookmark("unícøde-spéçial-chäråcter-bookmark")
def test_unicode_branch(self):
with pytest.raises(KeyError):
assert self.repo.branches["unicode-branch"]
with pytest.raises(KeyError):
assert self.repo.branches["unícøde-spéçial-chäråcter-branch"]
def test_repo_clone(self):
if os.path.exists(TEST_HG_REPO_CLONE):
pytest.fail(
f"Cannot test mercurial clone repo as location {TEST_HG_REPO_CLONE} already exists. You should manually remove it first."
)
repo = MercurialRepository(TEST_HG_REPO)
repo_clone = MercurialRepository(TEST_HG_REPO_CLONE, create=True, src_url=TEST_HG_REPO)
assert len(repo.commit_ids) == len(repo_clone.commit_ids)
# Checking hashes of commits should be enough
for commit in repo.get_commits():
raw_id = commit.raw_id
assert raw_id == repo_clone.get_commit(raw_id).raw_id
def test_repo_clone_with_update(self):
repo = MercurialRepository(TEST_HG_REPO)
repo_clone = MercurialRepository(
TEST_HG_REPO_CLONE + "_w_update", create=True, src_url=TEST_HG_REPO, do_workspace_checkout=True
)
assert len(repo.commit_ids) == len(repo_clone.commit_ids)
# check if current workdir was updated
assert os.path.isfile(os.path.join(TEST_HG_REPO_CLONE + "_w_update", "MANIFEST.in"))
def test_repo_clone_without_update(self):
repo = MercurialRepository(TEST_HG_REPO)
repo_clone = MercurialRepository(
TEST_HG_REPO_CLONE + "_wo_update", create=True, src_url=TEST_HG_REPO, do_workspace_checkout=False
)
assert len(repo.commit_ids) == len(repo_clone.commit_ids)
assert not os.path.isfile(os.path.join(TEST_HG_REPO_CLONE + "_wo_update", "MANIFEST.in"))
def test_commit_ids(self):
# there are 21 commits at bitbucket now
# so we can assume they would be available from now on
subset = {
"b986218ba1c9b0d6a259fac9b050b1724ed8e545",
"3d8f361e72ab303da48d799ff1ac40d5ac37c67e",
"6cba7170863a2411822803fa77a0a264f1310b35",
"56349e29c2af3ac913b28bde9a2c6154436e615b",
"2dda4e345facb0ccff1a191052dd1606dba6781d",
"6fff84722075f1607a30f436523403845f84cd9e",
"7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7",
"3803844fdbd3b711175fc3da9bdacfcd6d29a6fb",
"dc5d2c0661b61928834a785d3e64a3f80d3aad9c",
"be90031137367893f1c406e0a8683010fd115b79",
"db8e58be770518cbb2b1cdfa69146e47cd481481",
"84478366594b424af694a6c784cb991a16b87c21",
"17f8e105dddb9f339600389c6dc7175d395a535c",
"20a662e756499bde3095ffc9bc0643d1def2d0eb",
"2e319b85e70a707bba0beff866d9f9de032aa4f9",
"786facd2c61deb9cf91e9534735124fb8fc11842",
"94593d2128d38210a2fcd1aabff6dda0d6d9edf8",
"aa6a0de05b7612707db567078e130a6cd114a9a7",
"eada5a770da98ab0dd7325e29d00e0714f228d09",
}
assert subset.issubset(set(self.repo.commit_ids))
# check if we have the proper order of commits
org = [
"b986218ba1c9b0d6a259fac9b050b1724ed8e545",
"3d8f361e72ab303da48d799ff1ac40d5ac37c67e",
"6cba7170863a2411822803fa77a0a264f1310b35",
"56349e29c2af3ac913b28bde9a2c6154436e615b",
"2dda4e345facb0ccff1a191052dd1606dba6781d",
"6fff84722075f1607a30f436523403845f84cd9e",
"7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7",
"3803844fdbd3b711175fc3da9bdacfcd6d29a6fb",
"dc5d2c0661b61928834a785d3e64a3f80d3aad9c",
"be90031137367893f1c406e0a8683010fd115b79",
"db8e58be770518cbb2b1cdfa69146e47cd481481",
"84478366594b424af694a6c784cb991a16b87c21",
"17f8e105dddb9f339600389c6dc7175d395a535c",
"20a662e756499bde3095ffc9bc0643d1def2d0eb",
"2e319b85e70a707bba0beff866d9f9de032aa4f9",
"786facd2c61deb9cf91e9534735124fb8fc11842",
"94593d2128d38210a2fcd1aabff6dda0d6d9edf8",
"aa6a0de05b7612707db567078e130a6cd114a9a7",
"eada5a770da98ab0dd7325e29d00e0714f228d09",
"2c1885c735575ca478bf9e17b0029dca68824458",
"d9bcd465040bf869799b09ad732c04e0eea99fe9",
"469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7",
"4fb8326d78e5120da2c7468dcf7098997be385da",
"62b4a097164940bd66030c4db51687f3ec035eed",
"536c1a19428381cfea92ac44985304f6a8049569",
"965e8ab3c44b070cdaa5bf727ddef0ada980ecc4",
"9bb326a04ae5d98d437dece54be04f830cf1edd9",
"f8940bcb890a98c4702319fbe36db75ea309b475",
"ff5ab059786ebc7411e559a2cc309dfae3625a3b",
"6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08",
"ee87846a61c12153b51543bf860e1026c6d3dcba",
]
assert org == self.repo.commit_ids[:31]
def test_iter_slice(self):
sliced = list(self.repo[:10])
itered = list(self.repo)[:10]
assert sliced == itered
def test_slicing(self):
# 4 1 5 10 95
for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5), (10, 20, 10), (5, 100, 95)]:
indexes = list(self.repo[sfrom:sto])
assert len(indexes) == size
assert indexes[0] == self.repo.get_commit(commit_idx=sfrom)
assert indexes[-1] == self.repo.get_commit(commit_idx=sto - 1)
def test_branches(self):
# TODO: Need more tests here
# active branches
assert "default" in self.repo.branches
assert "stable" in self.repo.branches
# closed
assert "git" in self.repo._get_branches(closed=True)
assert "web" in self.repo._get_branches(closed=True)
for name, commit_id in self.repo.branches.items():
assert isinstance(self.repo.get_commit(commit_id), MercurialCommit)
def test_tip_in_tags(self):
# tip is always a tag
assert "tip" in self.repo.tags
def test_tip_commit_in_tags(self):
tip = self.repo.get_commit()
assert self.repo.tags["tip"] == tip.raw_id
def test_initial_commit(self):
init_commit = self.repo.get_commit(commit_idx=0)
init_author = init_commit.author
assert init_commit.message == "initial import"
assert init_author == "Marcin Kuzminski "
assert init_author == init_commit.committer
assert sorted(init_commit.added_paths) == sorted(
[
b"vcs/__init__.py",
b"vcs/backends/BaseRepository.py",
b"vcs/backends/__init__.py",
]
)
assert sorted(init_commit.affected_files) == sorted(
[
b"vcs/__init__.py",
b"vcs/backends/BaseRepository.py",
b"vcs/backends/__init__.py",
]
)
for path in (b"vcs/__init__.py", b"vcs/backends/BaseRepository.py", b"vcs/backends/__init__.py"):
assert isinstance(init_commit.get_node(path), FileNode)
for path in (b"", b"vcs", b"vcs/backends"):
assert isinstance(init_commit.get_node(path), DirNode)
with pytest.raises(NodeDoesNotExistError):
init_commit.get_node(path=b"foobar")
node = init_commit.get_node(b"vcs/")
assert hasattr(node, "kind")
assert node.kind == NodeKind.DIR
node = init_commit.get_node(b"vcs")
assert hasattr(node, "kind")
assert node.kind == NodeKind.DIR
node = init_commit.get_node(b"vcs/__init__.py")
assert hasattr(node, "kind")
assert node.kind == NodeKind.FILE
def test_not_existing_commit(self):
# rawid
with pytest.raises(RepositoryError):
self.repo.get_commit("abcd" * 10)
# shortid
with pytest.raises(RepositoryError):
self.repo.get_commit("erro" * 4)
# numeric
with pytest.raises(RepositoryError):
self.repo.get_commit(commit_idx=self.repo.count() + 1)
# Small chance we ever get to this one
idx = pow(2, 30)
with pytest.raises(RepositoryError):
self.repo.get_commit(commit_idx=idx)
def test_commit10(self):
commit10 = self.repo.get_commit(commit_idx=10)
readme = """===
VCS
===
Various Version Control System management abstraction layer for Python.
Introduction
------------
TODO: To be written...
"""
node = commit10.get_node(b"README.rst")
assert node.kind == NodeKind.FILE
assert node.str_content == readme
def test_local_clone(self):
clone_path = next(REPO_PATH_GENERATOR)
self.repo._local_clone(clone_path)
repo_clone = MercurialRepository(clone_path)
assert self.repo.commit_ids == repo_clone.commit_ids
def test_local_clone_fails_if_target_exists(self):
with pytest.raises(RepositoryError):
self.repo._local_clone(self.repo.path)
def test_update(self):
repo_clone = self.get_clone_repo()
branches = repo_clone.branches
repo_clone._update("default")
assert branches["default"] == repo_clone._identify()
repo_clone._update("stable")
assert branches["stable"] == repo_clone._identify()
def test_local_pull_branch(self):
target_repo = self.get_empty_repo()
source_repo = self.get_clone_repo()
default = Reference("branch", "default", source_repo.branches["default"])
target_repo._local_pull(source_repo.path, default)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["default"] == source_repo.branches["default"]
stable = Reference("branch", "stable", source_repo.branches["stable"])
target_repo._local_pull(source_repo.path, stable)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["stable"] == source_repo.branches["stable"]
def test_local_pull_bookmark(self):
target_repo = self.get_empty_repo()
source_repo = self.get_clone_repo()
commits = list(source_repo.get_commits(branch_name="default"))
foo1_id = commits[-5].raw_id
foo1 = Reference("book", "foo1", foo1_id)
source_repo._update(foo1_id)
source_repo.bookmark("foo1")
foo2_id = commits[-3].raw_id
foo2 = Reference("book", "foo2", foo2_id)
source_repo._update(foo2_id)
source_repo.bookmark("foo2")
target_repo._local_pull(source_repo.path, foo1)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["default"] == commits[-5].raw_id
target_repo._local_pull(source_repo.path, foo2)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["default"] == commits[-3].raw_id
def test_local_pull_commit(self):
target_repo = self.get_empty_repo()
source_repo = self.get_clone_repo()
commits = list(source_repo.get_commits(branch_name="default"))
commit_id = commits[-5].raw_id
commit = Reference("rev", commit_id, commit_id)
target_repo._local_pull(source_repo.path, commit)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["default"] == commit_id
commit_id = commits[-3].raw_id
commit = Reference("rev", commit_id, commit_id)
target_repo._local_pull(source_repo.path, commit)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["default"] == commit_id
def test_local_pull_from_same_repo(self):
reference = Reference("branch", "default", "")
with pytest.raises(ValueError):
self.repo._local_pull(self.repo.path, reference)
def test_validate_pull_reference_raises_on_missing_reference(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
reference = Reference("book", "invalid_reference", "a" * 40)
with pytest.raises(CommitDoesNotExistError):
target_repo._validate_pull_reference(reference)
def test_heads(self):
assert set(self.repo._heads()) == set(self.repo.branches.values())
def test_ancestor(self):
commits = [c.raw_id for c in self.repo.get_commits(branch_name="default")]
assert self.repo._ancestor(commits[-3], commits[-5]) == commits[-5]
assert self.repo._ancestor(commits[-5], commits[-3]) == commits[-5]
def test_local_push(self):
target_repo = self.get_empty_repo()
revisions = list(self.repo.get_commits(branch_name="default"))
revision = revisions[-5].raw_id
self.repo._local_push(revision, target_repo.path)
target_repo = MercurialRepository(target_repo.path)
assert target_repo.branches["default"] == revision
def test_hooks_can_be_enabled_for_local_push(self):
revision = "deadbeef"
repo_path = "test_group/test_repo"
with mock.patch.object(self.repo, "_remote") as remote_mock:
self.repo._local_push(revision, repo_path, enable_hooks=True)
remote_mock.push.assert_called_once_with([revision], repo_path, hooks=True, push_branches=False)
def test_local_merge(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
source_repo = vcsbackend_hg.clone_repo(target_repo)
vcsbackend_hg.add_file(target_repo, b"README_MERGE1", b"Version 1")
target_repo = MercurialRepository(target_repo.path)
target_rev = target_repo.branches["default"]
target_ref = Reference(type="branch", name="default", commit_id=target_rev)
vcsbackend_hg.add_file(source_repo, b"README_MERGE2", b"Version 2")
source_repo = MercurialRepository(source_repo.path)
source_rev = source_repo.branches["default"]
source_ref = Reference(type="branch", name="default", commit_id=source_rev)
target_repo._local_pull(source_repo.path, source_ref)
merge_message = "Merge message\n\nDescription:..."
user_name = "Albert Einstein"
user_email = "albert@einstein.com"
merge_commit_id, needs_push = target_repo._local_merge(
target_ref, merge_message, user_name, user_email, source_ref
)
assert needs_push
target_repo = MercurialRepository(target_repo.path)
assert target_repo.commit_ids[-3] == target_rev
assert target_repo.commit_ids[-2] == source_rev
last_commit = target_repo.get_commit(merge_commit_id)
assert last_commit.message.strip() == merge_message
assert last_commit.author == "%s <%s>" % (user_name, user_email)
assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state"))
def test_local_merge_source_is_fast_forward(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
source_repo = vcsbackend_hg.clone_repo(target_repo)
target_rev = target_repo.branches["default"]
target_ref = Reference(type="branch", name="default", commit_id=target_rev)
vcsbackend_hg.add_file(source_repo, "README_MERGE2", "Version 2")
source_repo = MercurialRepository(source_repo.path)
source_rev = source_repo.branches["default"]
source_ref = Reference(type="branch", name="default", commit_id=source_rev)
target_repo._local_pull(source_repo.path, source_ref)
merge_message = "Merge message\n\nDescription:..."
user_name = "Albert Einstein"
user_email = "albert@einstein.com"
merge_commit_id, needs_push = target_repo._local_merge(
target_ref, merge_message, user_name, user_email, source_ref
)
assert merge_commit_id == source_rev
assert needs_push
target_repo = MercurialRepository(target_repo.path)
assert target_repo.commit_ids[-2] == target_rev
assert target_repo.commit_ids[-1] == source_rev
assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state"))
def test_local_merge_source_is_integrated(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
target_rev = target_repo.branches["default"]
target_ref = Reference(type="branch", name="default", commit_id=target_rev)
merge_message = "Merge message\n\nDescription:..."
user_name = "Albert Einstein"
user_email = "albert@einstein.com"
merge_commit_id, needs_push = target_repo._local_merge(
target_ref, merge_message, user_name, user_email, target_ref
)
assert merge_commit_id == target_rev
assert not needs_push
target_repo = MercurialRepository(target_repo.path)
assert target_repo.commit_ids[-1] == target_rev
assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state"))
def test_local_merge_raises_exception_on_conflict(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
source_repo = vcsbackend_hg.clone_repo(target_repo)
vcsbackend_hg.add_file(target_repo, "README_MERGE", "Version 1")
target_repo = MercurialRepository(target_repo.path)
target_rev = target_repo.branches["default"]
target_ref = Reference(type="branch", name="default", commit_id=target_rev)
vcsbackend_hg.add_file(source_repo, "README_MERGE", "Version 2")
source_repo = MercurialRepository(source_repo.path)
source_rev = source_repo.branches["default"]
source_ref = Reference(type="branch", name="default", commit_id=source_rev)
target_repo._local_pull(source_repo.path, source_ref)
with pytest.raises(RepositoryError):
target_repo._local_merge(target_ref, "merge_message", "user name", "user@name.com", source_ref)
# Check we are not left in an intermediate merge state
assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state"))
def test_local_merge_of_two_branches_of_the_same_repo(self, backend_hg, vcs_repo):
commits = [
{"message": "a"},
{"message": "b", "branch": "b"},
]
repo = backend_hg.create_repo(commits)
commit_ids = backend_hg.commit_ids
target_ref = Reference(type="branch", name="default", commit_id=commit_ids["a"])
source_ref = Reference(type="branch", name="b", commit_id=commit_ids["b"])
merge_message = "Merge message\n\nDescription:..."
user_name = "Albert Einstein"
user_email = "albert@einstein.com"
vcs_repo = repo.scm_instance()
merge_commit_id, needs_push = vcs_repo._local_merge(
target_ref, merge_message, user_name, user_email, source_ref
)
assert merge_commit_id != source_ref.commit_id
assert needs_push is True
commit = vcs_repo.get_commit(merge_commit_id)
assert commit.merge is True
assert commit.message == merge_message
def test_maybe_prepare_merge_workspace(self):
workspace = self.repo._maybe_prepare_merge_workspace(1, "pr2", "unused", "unused2")
assert os.path.isdir(workspace)
workspace_repo = MercurialRepository(workspace)
assert workspace_repo.branches == self.repo.branches
# Calling it a second time should also succeed
workspace = self.repo._maybe_prepare_merge_workspace(1, "pr2", "unused", "unused2")
assert os.path.isdir(workspace)
def test_cleanup_merge_workspace(self):
workspace = self.repo._maybe_prepare_merge_workspace(1, "pr3", "unused", "unused2")
assert os.path.isdir(workspace)
self.repo.cleanup_merge_workspace(1, "pr3")
assert not os.path.exists(workspace)
def test_cleanup_merge_workspace_invalid_workspace_id(self):
# No assert: because in case of an inexistent workspace this function
# should still succeed.
self.repo.cleanup_merge_workspace(1, "pr4")
def test_merge_target_is_bookmark(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
source_repo = vcsbackend_hg.clone_repo(target_repo)
vcsbackend_hg.add_file(target_repo, "README_MERGE1", "Version 1")
vcsbackend_hg.add_file(source_repo, "README_MERGE2", "Version 2")
imc = source_repo.in_memory_commit
imc.add(FileNode(b"file_x", content=source_repo.name))
imc.commit(message="Automatic commit from repo merge test", author="Automatic ")
target_commit = target_repo.get_commit()
source_commit = source_repo.get_commit()
default_branch = target_repo.DEFAULT_BRANCH_NAME
bookmark_name = "bookmark"
target_repo._update(default_branch)
target_repo.bookmark(bookmark_name)
target_ref = Reference("book", bookmark_name, target_commit.raw_id)
source_ref = Reference("branch", default_branch, source_commit.raw_id)
workspace_id = "test-merge"
repo_id = repo_id_generator(target_repo.path)
merge_response = target_repo.merge(
repo_id,
workspace_id,
target_ref,
source_repo,
source_ref,
"test user",
"test@rhodecode.com",
"merge message 1",
dry_run=False,
)
expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(vcsbackend_hg.alias)(target_repo.path)
target_commits = list(target_repo.get_commits())
commit_ids = [c.raw_id for c in target_commits[:-1]]
assert source_ref.commit_id in commit_ids
assert target_ref.commit_id in commit_ids
merge_commit = target_commits[-1]
assert merge_commit.raw_id == merge_response.merge_ref.commit_id
assert merge_commit.message.strip() == "merge message 1"
assert merge_commit.author == "test user "
# Check the bookmark was updated in the target repo
assert target_repo.bookmarks[bookmark_name] == merge_response.merge_ref.commit_id
def test_merge_source_is_bookmark(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
source_repo = vcsbackend_hg.clone_repo(target_repo)
imc = source_repo.in_memory_commit
imc.add(FileNode(b"file_x", content=source_repo.name))
imc.commit(message="Automatic commit from repo merge test", author="Automatic ")
target_commit = target_repo.get_commit()
source_commit = source_repo.get_commit()
default_branch = target_repo.DEFAULT_BRANCH_NAME
bookmark_name = "bookmark"
target_ref = Reference("branch", default_branch, target_commit.raw_id)
source_repo._update(default_branch)
source_repo.bookmark(bookmark_name)
source_ref = Reference("book", bookmark_name, source_commit.raw_id)
workspace_id = "test-merge"
repo_id = repo_id_generator(target_repo.path)
merge_response = target_repo.merge(
repo_id,
workspace_id,
target_ref,
source_repo,
source_ref,
"test user",
"test@rhodecode.com",
"merge message 1",
dry_run=False,
)
expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(vcsbackend_hg.alias)(target_repo.path)
target_commits = list(target_repo.get_commits())
commit_ids = [c.raw_id for c in target_commits]
assert source_ref.commit_id == commit_ids[-1]
assert target_ref.commit_id == commit_ids[-2]
def test_merge_target_has_multiple_heads(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=2)
source_repo = vcsbackend_hg.clone_repo(target_repo)
vcsbackend_hg.add_file(target_repo, "README_MERGE1", "Version 1")
vcsbackend_hg.add_file(source_repo, "README_MERGE2", "Version 2")
# add an extra head to the target repo
imc = target_repo.in_memory_commit
imc.add(FileNode(b"file_x", content=b"foo"))
commits = list(target_repo.get_commits())
imc.commit(
message="Automatic commit from repo merge test",
author="Automatic ",
parents=commits[0:1],
)
target_commit = target_repo.get_commit()
source_commit = source_repo.get_commit()
default_branch = target_repo.DEFAULT_BRANCH_NAME
target_repo._update(default_branch)
target_ref = Reference("branch", default_branch, target_commit.raw_id)
source_ref = Reference("branch", default_branch, source_commit.raw_id)
workspace_id = "test-merge"
assert len(target_repo._heads(branch="default")) == 2
heads = target_repo._heads(branch="default")
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.HG_TARGET_HAS_MULTIPLE_HEADS, metadata={"heads": heads}
)
repo_id = repo_id_generator(target_repo.path)
merge_response = target_repo.merge(
repo_id,
workspace_id,
target_ref,
source_repo,
source_ref,
"test user",
"test@rhodecode.com",
"merge message 1",
dry_run=False,
)
assert merge_response == expected_merge_response
def test_merge_rebase_source_is_updated_bookmark(self, vcsbackend_hg):
target_repo = vcsbackend_hg.create_repo(number_of_commits=1)
source_repo = vcsbackend_hg.clone_repo(target_repo)
vcsbackend_hg.add_file(target_repo, b"README_MERGE1", b"Version 1")
vcsbackend_hg.add_file(source_repo, b"README_MERGE2", b"Version 2")
imc = source_repo.in_memory_commit
imc.add(FileNode(b"file_x", content=safe_bytes(source_repo.name)))
imc.commit(message="Automatic commit from repo merge test", author="Automatic ")
target_commit = target_repo.get_commit()
source_commit = source_repo.get_commit()
vcsbackend_hg.add_file(source_repo, b"LICENSE", b"LICENSE Info")
default_branch = target_repo.DEFAULT_BRANCH_NAME
bookmark_name = "bookmark"
source_repo._update(default_branch)
source_repo.bookmark(bookmark_name)
target_ref = Reference("branch", default_branch, target_commit.raw_id)
source_ref = Reference("book", bookmark_name, source_commit.raw_id)
repo_id = repo_id_generator(target_repo.path)
workspace_id = "test-merge"
merge_response = target_repo.merge(
repo_id,
workspace_id,
target_ref,
source_repo,
source_ref,
"test user",
"test@rhodecode.com",
"merge message 1",
dry_run=False,
use_rebase=True,
)
expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(vcsbackend_hg.alias)(target_repo.path)
last_commit = target_repo.get_commit()
assert last_commit.message == source_commit.message
assert last_commit.author == source_commit.author
# This checks that we effectively did a rebase
assert last_commit.raw_id != source_commit.raw_id
# Check the target has only 4 commits: 2 were already in target and
# only two should have been added
assert len(target_repo.commit_ids) == 2 + 2
class TestGetShadowInstance:
@pytest.fixture()
def repo(self, vcsbackend_hg):
_hg_repo = vcsbackend_hg.repo
connection_mock = mock.Mock(unsafe=True, name="connection.Hg")
mock.patch("rhodecode.lib.vcs.connection.Hg", connection_mock)
return _hg_repo
def test_getting_shadow_instance_copies_config(self, repo):
shadow = repo.get_shadow_instance(repo.path)
assert shadow.config.serialize() == repo.config.serialize()
def test_disables_hooks_section(self, repo):
repo.config.set("hooks", "foo", "val")
shadow = repo.get_shadow_instance(repo.path)
assert not shadow.config.items("hooks")
def test_allows_to_keep_hooks(self, repo):
repo.config.set("hooks", "foo", "val")
shadow = repo.get_shadow_instance(repo.path, enable_hooks=True)
assert shadow.config.items("hooks")
class TestMercurialCommit:
@pytest.fixture(autouse=True)
def prepare(self):
self.repo = MercurialRepository(TEST_HG_REPO)
def _test_equality(self, commit):
idx = commit.idx
assert commit == self.repo.get_commit(commit_idx=idx)
def test_equality(self):
indexes = [0, 10, 20]
commits = [self.repo.get_commit(commit_idx=idx) for idx in indexes]
for commit in commits:
self._test_equality(commit)
def test_default_commit(self):
tip = self.repo.get_commit("tip")
assert tip == self.repo.get_commit()
assert tip == self.repo.get_commit(commit_id=None)
assert tip == self.repo.get_commit(commit_idx=None)
assert tip == list(self.repo[-1:])[0]
def test_root_node(self):
tip = self.repo.get_commit("tip")
assert tip.root is tip.get_node(b"")
def test_lazy_fetch(self):
"""
Test if commit's nodes expands and are cached as we walk through
the commit. This test is somewhat hard to write as order of tests
is a key here. Written by running command after command in a shell.
"""
commit = self.repo.get_commit(commit_idx=45)
assert len(commit.nodes) == 0
root = commit.root
assert len(commit.nodes) == 1
assert len(root.nodes) == 8
# accessing root.nodes updates commit.nodes
assert len(commit.nodes) == 9
docs = commit.get_node(b"docs")
# we haven't yet accessed anything new as docs dir was already cached
assert len(commit.nodes) == 9
assert len(docs.nodes) == 8
# accessing docs.nodes updates commit.nodes
assert len(commit.nodes) == 17
assert docs is commit.get_node(b"docs")
assert docs is root.nodes[0]
assert docs is root.dirs[0]
assert docs is commit.get_node(b"docs")
def test_nodes_with_commit(self):
commit = self.repo.get_commit(commit_idx=45)
root = commit.root
assert isinstance(root, RootNode)
docs = commit.get_node(b"docs")
assert docs is commit.get_node(b"docs")
api = commit.get_node(b"docs/api")
assert api is commit.get_node(b"docs/api")
index = commit.get_node(b"docs/api/index.rst")
assert index is commit.get_node(b"docs/api/index.rst")
def test_branch_and_tags(self):
commit0 = self.repo.get_commit(commit_idx=0)
assert commit0.branch == "default"
assert commit0.tags == []
commit10 = self.repo.get_commit(commit_idx=10)
assert commit10.branch == "default"
assert commit10.tags == []
commit44 = self.repo.get_commit(commit_idx=44)
assert commit44.branch == "web"
tip = self.repo.get_commit("tip")
assert "tip" in tip.tags
def test_bookmarks(self):
commit0 = self.repo.get_commit(commit_idx=0)
assert commit0.bookmarks == []
def _test_file_size(self, idx, path, size):
node = self.repo.get_commit(commit_idx=idx).get_node(path)
assert node.is_file()
assert node.size == size
def test_file_size(self):
to_check = (
(10, b"setup.py", 1068),
(20, b"setup.py", 1106),
(60, b"setup.py", 1074),
(10, b"vcs/backends/base.py", 2921),
(20, b"vcs/backends/base.py", 3936),
(60, b"vcs/backends/base.py", 6189),
)
for idx, path, size in to_check:
self._test_file_size(idx, path, size)
def test_file_history_from_commits(self):
node = self.repo[10].get_node(b"setup.py")
commit_ids = [commit.raw_id for commit in node.history]
assert ["3803844fdbd3b711175fc3da9bdacfcd6d29a6fb"] == commit_ids
node = self.repo[20].get_node(b"setup.py")
node_ids = [commit.raw_id for commit in node.history]
assert ["eada5a770da98ab0dd7325e29d00e0714f228d09", "3803844fdbd3b711175fc3da9bdacfcd6d29a6fb"] == node_ids
# special case we check history from commit that has this particular
# file changed this means we check if it's included as well
node = self.repo.get_commit("eada5a770da98ab0dd7325e29d00e0714f228d09").get_node(b"setup.py")
node_ids = [commit.raw_id for commit in node.history]
assert ["eada5a770da98ab0dd7325e29d00e0714f228d09", "3803844fdbd3b711175fc3da9bdacfcd6d29a6fb"] == node_ids
def test_file_history(self):
# we can only check if those commits are present in the history
# as we cannot update this test every time file is changed
files = {
b"setup.py": [7, 18, 45, 46, 47, 69, 77],
b"vcs/nodes.py": [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60, 61, 73, 76],
b"vcs/backends/hg.py": [
4,
5,
6,
11,
12,
13,
14,
15,
16,
21,
22,
23,
26,
27,
28,
30,
31,
33,
35,
36,
37,
38,
39,
40,
41,
44,
45,
47,
48,
49,
53,
54,
55,
58,
60,
61,
67,
68,
69,
70,
73,
77,
78,
79,
82,
],
}
for path, indexes in files.items():
tip = self.repo.get_commit(commit_idx=indexes[-1])
node = tip.get_node(path)
node_indexes = [commit.idx for commit in node.history]
assert set(indexes).issubset(set(node_indexes)), (
"We assumed that %s is subset of commits for which file %s "
"has been changed, and history of that node returned: %s" % (indexes, path, node_indexes)
)
def test_file_annotate(self):
files = {
b"vcs/backends/__init__.py": {
89: {
"lines_no": 31,
"commits": [
32,
32,
61,
32,
32,
37,
32,
32,
32,
44,
37,
37,
37,
37,
45,
37,
44,
37,
37,
37,
32,
32,
32,
32,
37,
32,
37,
37,
32,
32,
32,
],
},
20: {"lines_no": 1, "commits": [4]},
55: {
"lines_no": 31,
"commits": [
32,
32,
45,
32,
32,
37,
32,
32,
32,
44,
37,
37,
37,
37,
45,
37,
44,
37,
37,
37,
32,
32,
32,
32,
37,
32,
37,
37,
32,
32,
32,
],
},
},
b"vcs/exceptions.py": {
89: {
"lines_no": 18,
"commits": [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 16, 16, 18, 18, 18],
},
20: {
"lines_no": 18,
"commits": [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 16, 16, 18, 18, 18],
},
55: {
"lines_no": 18,
"commits": [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 16, 16, 18, 18, 18],
},
},
b"MANIFEST.in": {
89: {"lines_no": 5, "commits": [7, 7, 7, 71, 71]},
20: {"lines_no": 3, "commits": [7, 7, 7]},
55: {"lines_no": 3, "commits": [7, 7, 7]},
},
}
for file_name, commit_dict in files.items():
for idx, __ in commit_dict.items():
commit = self.repo.get_commit(commit_idx=idx)
l1_1 = [x[1] for x in commit.get_file_annotate(file_name)]
l1_2 = [x[2]().raw_id for x in commit.get_file_annotate(file_name)]
assert l1_1 == l1_2
l1 = l1_2 = [x[2]().idx for x in commit.get_file_annotate(file_name)]
l2 = files[file_name][idx]["commits"]
assert l1 == l2, (
"The lists of commit for %s@commit_id%s"
"from annotation list should match each other,"
"got \n%s \nvs \n%s " % (file_name, idx, l1, l2)
)
def test_commit_state(self):
"""
Tests which files have been added/changed/removed at particular commit
"""
# commit_id 46ad32a4f974:
# hg st --rev 46ad32a4f974
# changed: 13
# added: 20
# removed: 1
changed = {
b".hgignore",
b"README.rst",
b"docs/conf.py",
b"docs/index.rst",
b"setup.py",
b"tests/test_hg.py",
b"tests/test_nodes.py",
b"vcs/__init__.py",
b"vcs/backends/__init__.py",
b"vcs/backends/base.py",
b"vcs/backends/hg.py",
b"vcs/nodes.py",
b"vcs/utils/__init__.py",
}
added = {
b"docs/api/backends/hg.rst",
b"docs/api/backends/index.rst",
b"docs/api/index.rst",
b"docs/api/nodes.rst",
b"docs/api/web/index.rst",
b"docs/api/web/simplevcs.rst",
b"docs/installation.rst",
b"docs/quickstart.rst",
b"setup.cfg",
b"vcs/utils/baseui_config.py",
b"vcs/utils/web.py",
b"vcs/web/__init__.py",
b"vcs/web/exceptions.py",
b"vcs/web/simplevcs/__init__.py",
b"vcs/web/simplevcs/exceptions.py",
b"vcs/web/simplevcs/middleware.py",
b"vcs/web/simplevcs/models.py",
b"vcs/web/simplevcs/settings.py",
b"vcs/web/simplevcs/utils.py",
b"vcs/web/simplevcs/views.py",
}
removed = {b"docs/api.rst"}
commit64 = self.repo.get_commit("46ad32a4f974")
assert set((node for node in commit64.added_paths)) == added
assert set((node for node in commit64.changed_paths)) == changed
assert set((node for node in commit64.removed_paths)) == removed
# commit_id b090f22d27d6:
# hg st --rev b090f22d27d6
# changed: 13
# added: 20
# removed: 1
commit88 = self.repo.get_commit("b090f22d27d6")
assert set((node for node in commit88.added_paths)) == set()
assert set((node for node in commit88.changed_paths)) == {b".hgignore"}
assert set((node for node in commit88.removed_paths)) == set()
#
# 85:
# added: 2 [
# 'vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
# changed: 4 ['vcs/web/simplevcs/models.py', ...]
# removed: 1 ['vcs/utils/web.py']
commit85 = self.repo.get_commit(commit_idx=85)
assert set((node for node in commit85.added_paths)) == {b"vcs/utils/diffs.py", b"vcs/web/simplevcs/views/diffs.py"}
assert set((node for node in commit85.changed_paths)) == {
b"vcs/web/simplevcs/models.py",
b"vcs/web/simplevcs/utils.py",
b"vcs/web/simplevcs/views/__init__.py",
b"vcs/web/simplevcs/views/repository.py",
}
assert set((node for node in commit85.removed_paths)) == {b"vcs/utils/web.py"}
def test_files_state(self):
"""
Tests state of FileNodes.
"""
commit = self.repo.get_commit(commit_idx=85)
node = commit.get_node(b"vcs/utils/diffs.py")
assert node.bytes_path in commit.added_paths
commit = self.repo.get_commit(commit_idx=88)
node = commit.get_node(b".hgignore")
assert node.bytes_path in commit.changed_paths
commit = self.repo.get_commit(commit_idx=85)
node = commit.get_node(b"setup.py")
assert node.bytes_path not in commit.affected_files
# If node has REMOVED state then trying to fetch it would raise
# CommitError exception
commit = self.repo.get_commit(commit_idx=2)
path = b"vcs/backends/BaseRepository.py"
with pytest.raises(NodeDoesNotExistError):
commit.get_node(path)
# but it would be one of ``removed`` (commit's attribute)
assert path in [rf for rf in commit.removed_paths]
def test_commit_message_is_unicode(self):
for cm in self.repo:
assert type(cm.message) == str
def test_commit_author_is_unicode(self):
for cm in self.repo:
assert type(cm.author) == str
def test_repo_files_content_type(self):
test_commit = self.repo.get_commit(commit_idx=100)
for node in test_commit.get_node(b"/"):
if node.is_file():
assert type(node.content) == bytes
assert type(node.str_content) == str
def test_wrong_path(self):
# There is 'setup.py' in the root dir but not there:
path = b"foo/bar/setup.py"
with pytest.raises(VCSError):
self.repo.get_commit().get_node(path)
def test_author_email(self):
assert "marcin@python-blog.com" == self.repo.get_commit("b986218ba1c9").author_email
assert "lukasz.balcerzak@python-center.pl" == self.repo.get_commit("3803844fdbd3").author_email
assert "" == self.repo.get_commit("84478366594b").author_email
def test_author_username(self):
assert "Marcin Kuzminski" == self.repo.get_commit("b986218ba1c9").author_name
assert "Lukasz Balcerzak" == self.repo.get_commit("3803844fdbd3").author_name
assert "marcink" == self.repo.get_commit("84478366594b").author_name
class TestLargeFileRepo:
def test_large_file(self, backend_hg):
conf = make_db_config()
hg_largefiles_store = conf.get("largefiles", "usercache")
repo = backend_hg.create_test_repo("largefiles", conf)
tip = repo.scm_instance().get_commit()
node = tip.get_node(b".hglf/thisfileislarge")
lf_node = node.get_largefile_node()
assert lf_node.is_largefile() is True
assert lf_node.size == 1024000
assert lf_node.name == b".hglf/thisfileislarge"
class TestGetBranchName:
@pytest.fixture(autouse=True)
def prepare(self):
self.repo = MercurialRepository(TEST_HG_REPO)
def test_returns_ref_name_when_type_is_branch(self):
ref = self._create_ref("branch", "fake-name")
result = self.repo._get_branch_name(ref)
assert result == ref.name
@pytest.mark.parametrize("type_", ("book", "tag"))
def test_queries_remote_when_type_is_not_branch(self, type_):
ref = self._create_ref(type_, "wrong-fake-name")
with mock.patch.object(self.repo, "_remote") as remote_mock:
remote_mock.ctx_branch.return_value = "fake-name"
result = self.repo._get_branch_name(ref)
assert result == "fake-name"
remote_mock.ctx_branch.assert_called_once_with(ref.commit_id)
def _create_ref(self, type_, name):
ref = mock.Mock()
ref.type = type_
ref.name = "wrong-fake-name"
ref.commit_id = "deadbeef"
return ref
class TestIsTheSameBranch:
@pytest.fixture(autouse=True)
def prepare(self):
self.repo = MercurialRepository(TEST_HG_REPO)
def test_returns_true_when_branches_are_equal(self):
source_ref = mock.Mock(name="source-ref")
target_ref = mock.Mock(name="target-ref")
branch_name_patcher = mock.patch.object(self.repo, "_get_branch_name", return_value="default")
with branch_name_patcher as branch_name_mock:
result = self.repo._is_the_same_branch(source_ref, target_ref)
expected_calls = [mock.call(source_ref), mock.call(target_ref)]
assert branch_name_mock.call_args_list == expected_calls
assert result is True
def test_returns_false_when_branches_are_not_equal(self):
source_ref = mock.Mock(name="source-ref")
source_ref.name = "source-branch"
target_ref = mock.Mock(name="target-ref")
source_ref.name = "target-branch"
def side_effect(ref):
return ref.name
branch_name_patcher = mock.patch.object(self.repo, "_get_branch_name", side_effect=side_effect)
with branch_name_patcher as branch_name_mock:
result = self.repo._is_the_same_branch(source_ref, target_ref)
expected_calls = [mock.call(source_ref), mock.call(target_ref)]
assert branch_name_mock.call_args_list == expected_calls
assert result is False