test_hg.py
1260 lines
| 49.1 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import mock | ||||
import pytest | ||||
r5087 | from rhodecode.lib.str_utils import safe_bytes | |||
r1577 | from rhodecode.lib.utils import make_db_config | |||
r1 | from rhodecode.lib.vcs import backends | |||
r5607 | from rhodecode.lib.vcs.backends.base import Reference, MergeResponse, MergeFailureReason | |||
r1 | from rhodecode.lib.vcs.backends.hg import MercurialRepository, MercurialCommit | |||
r5607 | from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError, CommitDoesNotExistError | |||
r5647 | from rhodecode.lib.vcs.nodes import FileNode, NodeKind, DirNode, RootNode | |||
r2810 | from rhodecode.tests import TEST_HG_REPO, TEST_HG_REPO_CLONE, repo_id_generator | |||
r1 | ||||
pytestmark = pytest.mark.backends("hg") | ||||
def repo_path_generator(): | ||||
""" | ||||
Return a different path to be used for cloning repos. | ||||
""" | ||||
i = 0 | ||||
while True: | ||||
i += 1 | ||||
r5647 | yield f"{TEST_HG_REPO_CLONE}-{i:d}" | |||
r5607 | ||||
r1 | ||||
REPO_PATH_GENERATOR = repo_path_generator() | ||||
r5647 | class TestMercurialRepository: | |||
r1 | # pylint: disable=protected-access | |||
r5647 | @pytest.fixture(autouse=True) | |||
def prepare(self): | ||||
self.repo = MercurialRepository(TEST_HG_REPO) | ||||
r1 | ||||
def get_clone_repo(self): | ||||
""" | ||||
Return a clone of the base repo. | ||||
""" | ||||
clone_path = next(REPO_PATH_GENERATOR) | ||||
r5607 | repo_clone = MercurialRepository(clone_path, create=True, src_url=self.repo.path) | |||
r1 | ||||
return repo_clone | ||||
def get_empty_repo(self): | ||||
""" | ||||
Return an empty repo. | ||||
""" | ||||
return MercurialRepository(next(REPO_PATH_GENERATOR), create=True) | ||||
def test_wrong_repo_path(self): | ||||
r5607 | wrong_repo_path = "/tmp/errorrepo_hg" | |||
r1 | with pytest.raises(RepositoryError): | |||
MercurialRepository(wrong_repo_path) | ||||
def test_unicode_path_repo(self): | ||||
with pytest.raises(VCSError): | ||||
r5607 | MercurialRepository("iShouldFail") | |||
r1 | ||||
def test_unicode_commit_id(self): | ||||
with pytest.raises(CommitDoesNotExistError): | ||||
r5607 | self.repo.get_commit("unicode-commit-id") | |||
r1 | with pytest.raises(CommitDoesNotExistError): | |||
r5607 | self.repo.get_commit("unÃcøde-spéçial-chärÃ¥cter-commit-id") | |||
r1 | ||||
def test_unicode_bookmark(self): | ||||
r5607 | self.repo.bookmark("unicode-bookmark") | |||
self.repo.bookmark("unÃcøde-spéçial-chärÃ¥cter-bookmark") | ||||
r1 | ||||
def test_unicode_branch(self): | ||||
with pytest.raises(KeyError): | ||||
r5607 | assert self.repo.branches["unicode-branch"] | |||
r1 | with pytest.raises(KeyError): | |||
r5607 | assert self.repo.branches["unÃcøde-spéçial-chärÃ¥cter-branch"] | |||
r1 | ||||
def test_repo_clone(self): | ||||
if os.path.exists(TEST_HG_REPO_CLONE): | ||||
r5647 | pytest.fail( | |||
f"Cannot test mercurial clone repo as location {TEST_HG_REPO_CLONE} already exists. You should manually remove it first." | ||||
r5607 | ) | |||
r1 | ||||
repo = MercurialRepository(TEST_HG_REPO) | ||||
r5607 | repo_clone = MercurialRepository(TEST_HG_REPO_CLONE, create=True, src_url=TEST_HG_REPO) | |||
r1 | assert len(repo.commit_ids) == len(repo_clone.commit_ids) | |||
# Checking hashes of commits should be enough | ||||
for commit in repo.get_commits(): | ||||
raw_id = commit.raw_id | ||||
assert raw_id == repo_clone.get_commit(raw_id).raw_id | ||||
def test_repo_clone_with_update(self): | ||||
repo = MercurialRepository(TEST_HG_REPO) | ||||
repo_clone = MercurialRepository( | ||||
r5607 | TEST_HG_REPO_CLONE + "_w_update", create=True, src_url=TEST_HG_REPO, do_workspace_checkout=True | |||
) | ||||
r1 | assert len(repo.commit_ids) == len(repo_clone.commit_ids) | |||
# check if current workdir was updated | ||||
r5607 | assert os.path.isfile(os.path.join(TEST_HG_REPO_CLONE + "_w_update", "MANIFEST.in")) | |||
r1 | ||||
def test_repo_clone_without_update(self): | ||||
repo = MercurialRepository(TEST_HG_REPO) | ||||
repo_clone = MercurialRepository( | ||||
r5607 | TEST_HG_REPO_CLONE + "_wo_update", create=True, src_url=TEST_HG_REPO, do_workspace_checkout=False | |||
) | ||||
r1 | assert len(repo.commit_ids) == len(repo_clone.commit_ids) | |||
r5607 | assert not os.path.isfile(os.path.join(TEST_HG_REPO_CLONE + "_wo_update", "MANIFEST.in")) | |||
r1 | ||||
def test_commit_ids(self): | ||||
# there are 21 commits at bitbucket now | ||||
# so we can assume they would be available from now on | ||||
r5607 | subset = { | |||
"b986218ba1c9b0d6a259fac9b050b1724ed8e545", | ||||
"3d8f361e72ab303da48d799ff1ac40d5ac37c67e", | ||||
"6cba7170863a2411822803fa77a0a264f1310b35", | ||||
"56349e29c2af3ac913b28bde9a2c6154436e615b", | ||||
"2dda4e345facb0ccff1a191052dd1606dba6781d", | ||||
"6fff84722075f1607a30f436523403845f84cd9e", | ||||
"7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7", | ||||
"3803844fdbd3b711175fc3da9bdacfcd6d29a6fb", | ||||
"dc5d2c0661b61928834a785d3e64a3f80d3aad9c", | ||||
"be90031137367893f1c406e0a8683010fd115b79", | ||||
"db8e58be770518cbb2b1cdfa69146e47cd481481", | ||||
"84478366594b424af694a6c784cb991a16b87c21", | ||||
"17f8e105dddb9f339600389c6dc7175d395a535c", | ||||
"20a662e756499bde3095ffc9bc0643d1def2d0eb", | ||||
"2e319b85e70a707bba0beff866d9f9de032aa4f9", | ||||
"786facd2c61deb9cf91e9534735124fb8fc11842", | ||||
"94593d2128d38210a2fcd1aabff6dda0d6d9edf8", | ||||
"aa6a0de05b7612707db567078e130a6cd114a9a7", | ||||
"eada5a770da98ab0dd7325e29d00e0714f228d09", | ||||
} | ||||
r1 | assert subset.issubset(set(self.repo.commit_ids)) | |||
# check if we have the proper order of commits | ||||
org = [ | ||||
r5607 | "b986218ba1c9b0d6a259fac9b050b1724ed8e545", | |||
"3d8f361e72ab303da48d799ff1ac40d5ac37c67e", | ||||
"6cba7170863a2411822803fa77a0a264f1310b35", | ||||
"56349e29c2af3ac913b28bde9a2c6154436e615b", | ||||
"2dda4e345facb0ccff1a191052dd1606dba6781d", | ||||
"6fff84722075f1607a30f436523403845f84cd9e", | ||||
"7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7", | ||||
"3803844fdbd3b711175fc3da9bdacfcd6d29a6fb", | ||||
"dc5d2c0661b61928834a785d3e64a3f80d3aad9c", | ||||
"be90031137367893f1c406e0a8683010fd115b79", | ||||
"db8e58be770518cbb2b1cdfa69146e47cd481481", | ||||
"84478366594b424af694a6c784cb991a16b87c21", | ||||
"17f8e105dddb9f339600389c6dc7175d395a535c", | ||||
"20a662e756499bde3095ffc9bc0643d1def2d0eb", | ||||
"2e319b85e70a707bba0beff866d9f9de032aa4f9", | ||||
"786facd2c61deb9cf91e9534735124fb8fc11842", | ||||
"94593d2128d38210a2fcd1aabff6dda0d6d9edf8", | ||||
"aa6a0de05b7612707db567078e130a6cd114a9a7", | ||||
"eada5a770da98ab0dd7325e29d00e0714f228d09", | ||||
"2c1885c735575ca478bf9e17b0029dca68824458", | ||||
"d9bcd465040bf869799b09ad732c04e0eea99fe9", | ||||
"469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7", | ||||
"4fb8326d78e5120da2c7468dcf7098997be385da", | ||||
"62b4a097164940bd66030c4db51687f3ec035eed", | ||||
"536c1a19428381cfea92ac44985304f6a8049569", | ||||
"965e8ab3c44b070cdaa5bf727ddef0ada980ecc4", | ||||
"9bb326a04ae5d98d437dece54be04f830cf1edd9", | ||||
"f8940bcb890a98c4702319fbe36db75ea309b475", | ||||
"ff5ab059786ebc7411e559a2cc309dfae3625a3b", | ||||
"6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08", | ||||
"ee87846a61c12153b51543bf860e1026c6d3dcba", | ||||
r1 | ] | |||
assert org == self.repo.commit_ids[:31] | ||||
def test_iter_slice(self): | ||||
sliced = list(self.repo[:10]) | ||||
itered = list(self.repo)[:10] | ||||
assert sliced == itered | ||||
def test_slicing(self): | ||||
# 4 1 5 10 95 | ||||
r5607 | for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5), (10, 20, 10), (5, 100, 95)]: | |||
r1 | indexes = list(self.repo[sfrom:sto]) | |||
assert len(indexes) == size | ||||
assert indexes[0] == self.repo.get_commit(commit_idx=sfrom) | ||||
assert indexes[-1] == self.repo.get_commit(commit_idx=sto - 1) | ||||
def test_branches(self): | ||||
# TODO: Need more tests here | ||||
# active branches | ||||
r5607 | assert "default" in self.repo.branches | |||
assert "stable" in self.repo.branches | ||||
r1 | ||||
# closed | ||||
r5607 | assert "git" in self.repo._get_branches(closed=True) | |||
assert "web" in self.repo._get_branches(closed=True) | ||||
r1 | ||||
r5647 | for name, commit_id in self.repo.branches.items(): | |||
assert isinstance(self.repo.get_commit(commit_id), MercurialCommit) | ||||
r1 | ||||
def test_tip_in_tags(self): | ||||
# tip is always a tag | ||||
r5607 | assert "tip" in self.repo.tags | |||
r1 | ||||
def test_tip_commit_in_tags(self): | ||||
tip = self.repo.get_commit() | ||||
r5607 | assert self.repo.tags["tip"] == tip.raw_id | |||
r1 | ||||
def test_initial_commit(self): | ||||
init_commit = self.repo.get_commit(commit_idx=0) | ||||
init_author = init_commit.author | ||||
r5607 | assert init_commit.message == "initial import" | |||
assert init_author == "Marcin Kuzminski <marcin@python-blog.com>" | ||||
r1 | assert init_author == init_commit.committer | |||
r5647 | assert sorted(init_commit.added_paths) == sorted( | |||
r5607 | [ | |||
r5647 | b"vcs/__init__.py", | |||
b"vcs/backends/BaseRepository.py", | ||||
b"vcs/backends/__init__.py", | ||||
r5607 | ] | |||
) | ||||
r5647 | assert sorted(init_commit.affected_files) == sorted( | |||
[ | ||||
b"vcs/__init__.py", | ||||
b"vcs/backends/BaseRepository.py", | ||||
b"vcs/backends/__init__.py", | ||||
] | ||||
) | ||||
r1 | ||||
r5647 | for path in (b"vcs/__init__.py", b"vcs/backends/BaseRepository.py", b"vcs/backends/__init__.py"): | |||
assert isinstance(init_commit.get_node(path), FileNode) | ||||
for path in (b"", b"vcs", b"vcs/backends"): | ||||
assert isinstance(init_commit.get_node(path), DirNode) | ||||
r1 | ||||
with pytest.raises(NodeDoesNotExistError): | ||||
r5647 | init_commit.get_node(path=b"foobar") | |||
r1 | ||||
r5647 | node = init_commit.get_node(b"vcs/") | |||
r5607 | assert hasattr(node, "kind") | |||
r1 | assert node.kind == NodeKind.DIR | |||
r5647 | node = init_commit.get_node(b"vcs") | |||
r5607 | assert hasattr(node, "kind") | |||
r1 | assert node.kind == NodeKind.DIR | |||
r5647 | node = init_commit.get_node(b"vcs/__init__.py") | |||
r5607 | assert hasattr(node, "kind") | |||
r1 | assert node.kind == NodeKind.FILE | |||
def test_not_existing_commit(self): | ||||
# rawid | ||||
with pytest.raises(RepositoryError): | ||||
r5607 | self.repo.get_commit("abcd" * 10) | |||
r1 | # shortid | |||
with pytest.raises(RepositoryError): | ||||
r5607 | self.repo.get_commit("erro" * 4) | |||
r1 | # numeric | |||
with pytest.raises(RepositoryError): | ||||
self.repo.get_commit(commit_idx=self.repo.count() + 1) | ||||
# Small chance we ever get to this one | ||||
idx = pow(2, 30) | ||||
with pytest.raises(RepositoryError): | ||||
self.repo.get_commit(commit_idx=idx) | ||||
def test_commit10(self): | ||||
commit10 = self.repo.get_commit(commit_idx=10) | ||||
r5647 | readme = """=== | |||
r1 | VCS | |||
=== | ||||
Various Version Control System management abstraction layer for Python. | ||||
Introduction | ||||
------------ | ||||
TODO: To be written... | ||||
""" | ||||
r5647 | node = commit10.get_node(b"README.rst") | |||
r1 | assert node.kind == NodeKind.FILE | |||
r5647 | assert node.str_content == readme | |||
r1 | ||||
def test_local_clone(self): | ||||
clone_path = next(REPO_PATH_GENERATOR) | ||||
self.repo._local_clone(clone_path) | ||||
repo_clone = MercurialRepository(clone_path) | ||||
assert self.repo.commit_ids == repo_clone.commit_ids | ||||
def test_local_clone_fails_if_target_exists(self): | ||||
with pytest.raises(RepositoryError): | ||||
self.repo._local_clone(self.repo.path) | ||||
def test_update(self): | ||||
repo_clone = self.get_clone_repo() | ||||
branches = repo_clone.branches | ||||
r5607 | repo_clone._update("default") | |||
assert branches["default"] == repo_clone._identify() | ||||
repo_clone._update("stable") | ||||
assert branches["stable"] == repo_clone._identify() | ||||
r1 | ||||
def test_local_pull_branch(self): | ||||
target_repo = self.get_empty_repo() | ||||
source_repo = self.get_clone_repo() | ||||
r5607 | default = Reference("branch", "default", source_repo.branches["default"]) | |||
r1 | target_repo._local_pull(source_repo.path, default) | |||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["default"] == source_repo.branches["default"] | |||
r1 | ||||
r5607 | stable = Reference("branch", "stable", source_repo.branches["stable"]) | |||
r1 | target_repo._local_pull(source_repo.path, stable) | |||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["stable"] == source_repo.branches["stable"] | |||
r1 | ||||
def test_local_pull_bookmark(self): | ||||
target_repo = self.get_empty_repo() | ||||
source_repo = self.get_clone_repo() | ||||
r5607 | commits = list(source_repo.get_commits(branch_name="default")) | |||
r1 | foo1_id = commits[-5].raw_id | |||
r5607 | foo1 = Reference("book", "foo1", foo1_id) | |||
r1 | source_repo._update(foo1_id) | |||
r5607 | source_repo.bookmark("foo1") | |||
r1 | ||||
foo2_id = commits[-3].raw_id | ||||
r5607 | foo2 = Reference("book", "foo2", foo2_id) | |||
r1 | source_repo._update(foo2_id) | |||
r5607 | source_repo.bookmark("foo2") | |||
r1 | ||||
target_repo._local_pull(source_repo.path, foo1) | ||||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["default"] == commits[-5].raw_id | |||
r1 | ||||
target_repo._local_pull(source_repo.path, foo2) | ||||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["default"] == commits[-3].raw_id | |||
r1 | ||||
def test_local_pull_commit(self): | ||||
target_repo = self.get_empty_repo() | ||||
source_repo = self.get_clone_repo() | ||||
r5607 | commits = list(source_repo.get_commits(branch_name="default")) | |||
r1 | commit_id = commits[-5].raw_id | |||
r5607 | commit = Reference("rev", commit_id, commit_id) | |||
r1 | target_repo._local_pull(source_repo.path, commit) | |||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["default"] == commit_id | |||
r1 | ||||
commit_id = commits[-3].raw_id | ||||
r5607 | commit = Reference("rev", commit_id, commit_id) | |||
r1 | target_repo._local_pull(source_repo.path, commit) | |||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["default"] == commit_id | |||
r1 | ||||
def test_local_pull_from_same_repo(self): | ||||
r5647 | reference = Reference("branch", "default", "") | |||
r1 | with pytest.raises(ValueError): | |||
self.repo._local_pull(self.repo.path, reference) | ||||
r5607 | def test_validate_pull_reference_raises_on_missing_reference(self, vcsbackend_hg): | |||
r1 | target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | |||
r5607 | reference = Reference("book", "invalid_reference", "a" * 40) | |||
r1 | ||||
with pytest.raises(CommitDoesNotExistError): | ||||
target_repo._validate_pull_reference(reference) | ||||
def test_heads(self): | ||||
assert set(self.repo._heads()) == set(self.repo.branches.values()) | ||||
def test_ancestor(self): | ||||
r5607 | commits = [c.raw_id for c in self.repo.get_commits(branch_name="default")] | |||
r1 | assert self.repo._ancestor(commits[-3], commits[-5]) == commits[-5] | |||
assert self.repo._ancestor(commits[-5], commits[-3]) == commits[-5] | ||||
def test_local_push(self): | ||||
target_repo = self.get_empty_repo() | ||||
r5607 | revisions = list(self.repo.get_commits(branch_name="default")) | |||
r1 | revision = revisions[-5].raw_id | |||
self.repo._local_push(revision, target_repo.path) | ||||
target_repo = MercurialRepository(target_repo.path) | ||||
r5607 | assert target_repo.branches["default"] == revision | |||
r1 | ||||
def test_hooks_can_be_enabled_for_local_push(self): | ||||
r5607 | revision = "deadbeef" | |||
repo_path = "test_group/test_repo" | ||||
with mock.patch.object(self.repo, "_remote") as remote_mock: | ||||
r1 | self.repo._local_push(revision, repo_path, enable_hooks=True) | |||
r5607 | remote_mock.push.assert_called_once_with([revision], repo_path, hooks=True, push_branches=False) | |||
r1 | ||||
def test_local_merge(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
r5607 | vcsbackend_hg.add_file(target_repo, b"README_MERGE1", b"Version 1") | |||
r1 | target_repo = MercurialRepository(target_repo.path) | |||
r5607 | target_rev = target_repo.branches["default"] | |||
target_ref = Reference(type="branch", name="default", commit_id=target_rev) | ||||
vcsbackend_hg.add_file(source_repo, b"README_MERGE2", b"Version 2") | ||||
r1 | source_repo = MercurialRepository(source_repo.path) | |||
r5607 | source_rev = source_repo.branches["default"] | |||
source_ref = Reference(type="branch", name="default", commit_id=source_rev) | ||||
r1 | ||||
target_repo._local_pull(source_repo.path, source_ref) | ||||
r5607 | merge_message = "Merge message\n\nDescription:..." | |||
user_name = "Albert Einstein" | ||||
user_email = "albert@einstein.com" | ||||
r1 | merge_commit_id, needs_push = target_repo._local_merge( | |||
r5607 | target_ref, merge_message, user_name, user_email, source_ref | |||
) | ||||
r1 | assert needs_push | |||
target_repo = MercurialRepository(target_repo.path) | ||||
assert target_repo.commit_ids[-3] == target_rev | ||||
assert target_repo.commit_ids[-2] == source_rev | ||||
last_commit = target_repo.get_commit(merge_commit_id) | ||||
assert last_commit.message.strip() == merge_message | ||||
r5607 | assert last_commit.author == "%s <%s>" % (user_name, user_email) | |||
r1 | ||||
r5607 | assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state")) | |||
r1 | ||||
def test_local_merge_source_is_fast_forward(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
r5607 | target_rev = target_repo.branches["default"] | |||
target_ref = Reference(type="branch", name="default", commit_id=target_rev) | ||||
vcsbackend_hg.add_file(source_repo, "README_MERGE2", "Version 2") | ||||
r1 | source_repo = MercurialRepository(source_repo.path) | |||
r5607 | source_rev = source_repo.branches["default"] | |||
source_ref = Reference(type="branch", name="default", commit_id=source_rev) | ||||
r1 | ||||
target_repo._local_pull(source_repo.path, source_ref) | ||||
r5607 | merge_message = "Merge message\n\nDescription:..." | |||
user_name = "Albert Einstein" | ||||
user_email = "albert@einstein.com" | ||||
r1 | merge_commit_id, needs_push = target_repo._local_merge( | |||
r5607 | target_ref, merge_message, user_name, user_email, source_ref | |||
) | ||||
r1 | assert merge_commit_id == source_rev | |||
assert needs_push | ||||
target_repo = MercurialRepository(target_repo.path) | ||||
assert target_repo.commit_ids[-2] == target_rev | ||||
assert target_repo.commit_ids[-1] == source_rev | ||||
r5607 | assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state")) | |||
r1 | ||||
def test_local_merge_source_is_integrated(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
r5607 | target_rev = target_repo.branches["default"] | |||
target_ref = Reference(type="branch", name="default", commit_id=target_rev) | ||||
r1 | ||||
r5607 | merge_message = "Merge message\n\nDescription:..." | |||
user_name = "Albert Einstein" | ||||
user_email = "albert@einstein.com" | ||||
r1 | merge_commit_id, needs_push = target_repo._local_merge( | |||
r5607 | target_ref, merge_message, user_name, user_email, target_ref | |||
) | ||||
r1 | assert merge_commit_id == target_rev | |||
assert not needs_push | ||||
target_repo = MercurialRepository(target_repo.path) | ||||
assert target_repo.commit_ids[-1] == target_rev | ||||
r5607 | assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state")) | |||
r1 | ||||
def test_local_merge_raises_exception_on_conflict(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
r5607 | vcsbackend_hg.add_file(target_repo, "README_MERGE", "Version 1") | |||
r1 | target_repo = MercurialRepository(target_repo.path) | |||
r5607 | target_rev = target_repo.branches["default"] | |||
target_ref = Reference(type="branch", name="default", commit_id=target_rev) | ||||
vcsbackend_hg.add_file(source_repo, "README_MERGE", "Version 2") | ||||
r1 | source_repo = MercurialRepository(source_repo.path) | |||
r5607 | source_rev = source_repo.branches["default"] | |||
source_ref = Reference(type="branch", name="default", commit_id=source_rev) | ||||
r1 | ||||
target_repo._local_pull(source_repo.path, source_ref) | ||||
with pytest.raises(RepositoryError): | ||||
r5607 | target_repo._local_merge(target_ref, "merge_message", "user name", "user@name.com", source_ref) | |||
r1 | ||||
# Check we are not left in an intermediate merge state | ||||
r5607 | assert not os.path.exists(os.path.join(target_repo.path, ".hg", "merge", "state")) | |||
r1 | ||||
r5647 | def test_local_merge_of_two_branches_of_the_same_repo(self, backend_hg, vcs_repo): | |||
r1 | commits = [ | |||
r5607 | {"message": "a"}, | |||
{"message": "b", "branch": "b"}, | ||||
r1 | ] | |||
repo = backend_hg.create_repo(commits) | ||||
commit_ids = backend_hg.commit_ids | ||||
r5607 | target_ref = Reference(type="branch", name="default", commit_id=commit_ids["a"]) | |||
source_ref = Reference(type="branch", name="b", commit_id=commit_ids["b"]) | ||||
merge_message = "Merge message\n\nDescription:..." | ||||
user_name = "Albert Einstein" | ||||
user_email = "albert@einstein.com" | ||||
r1 | vcs_repo = repo.scm_instance() | |||
merge_commit_id, needs_push = vcs_repo._local_merge( | ||||
r5607 | target_ref, merge_message, user_name, user_email, source_ref | |||
) | ||||
r1 | assert merge_commit_id != source_ref.commit_id | |||
assert needs_push is True | ||||
commit = vcs_repo.get_commit(merge_commit_id) | ||||
assert commit.merge is True | ||||
assert commit.message == merge_message | ||||
def test_maybe_prepare_merge_workspace(self): | ||||
r5607 | workspace = self.repo._maybe_prepare_merge_workspace(1, "pr2", "unused", "unused2") | |||
r1 | ||||
assert os.path.isdir(workspace) | ||||
workspace_repo = MercurialRepository(workspace) | ||||
assert workspace_repo.branches == self.repo.branches | ||||
# Calling it a second time should also succeed | ||||
r5607 | workspace = self.repo._maybe_prepare_merge_workspace(1, "pr2", "unused", "unused2") | |||
r1 | assert os.path.isdir(workspace) | |||
def test_cleanup_merge_workspace(self): | ||||
r5607 | workspace = self.repo._maybe_prepare_merge_workspace(1, "pr3", "unused", "unused2") | |||
r2810 | ||||
assert os.path.isdir(workspace) | ||||
r5607 | self.repo.cleanup_merge_workspace(1, "pr3") | |||
r1 | ||||
assert not os.path.exists(workspace) | ||||
def test_cleanup_merge_workspace_invalid_workspace_id(self): | ||||
# No assert: because in case of an inexistent workspace this function | ||||
# should still succeed. | ||||
r5607 | self.repo.cleanup_merge_workspace(1, "pr4") | |||
r1 | ||||
def test_merge_target_is_bookmark(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
r5607 | vcsbackend_hg.add_file(target_repo, "README_MERGE1", "Version 1") | |||
vcsbackend_hg.add_file(source_repo, "README_MERGE2", "Version 2") | ||||
r1 | imc = source_repo.in_memory_commit | |||
r5607 | imc.add(FileNode(b"file_x", content=source_repo.name)) | |||
imc.commit(message="Automatic commit from repo merge test", author="Automatic <automatic@rhodecode.com>") | ||||
r1 | target_commit = target_repo.get_commit() | |||
source_commit = source_repo.get_commit() | ||||
default_branch = target_repo.DEFAULT_BRANCH_NAME | ||||
r5607 | bookmark_name = "bookmark" | |||
r1 | target_repo._update(default_branch) | |||
target_repo.bookmark(bookmark_name) | ||||
r5607 | target_ref = Reference("book", bookmark_name, target_commit.raw_id) | |||
source_ref = Reference("branch", default_branch, source_commit.raw_id) | ||||
workspace_id = "test-merge" | ||||
r2810 | repo_id = repo_id_generator(target_repo.path) | |||
r1 | merge_response = target_repo.merge( | |||
r5607 | repo_id, | |||
workspace_id, | ||||
target_ref, | ||||
source_repo, | ||||
source_ref, | ||||
"test user", | ||||
"test@rhodecode.com", | ||||
"merge message 1", | ||||
dry_run=False, | ||||
) | ||||
expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE) | ||||
r1 | assert merge_response == expected_merge_response | |||
r5607 | target_repo = backends.get_backend(vcsbackend_hg.alias)(target_repo.path) | |||
r1 | target_commits = list(target_repo.get_commits()) | |||
commit_ids = [c.raw_id for c in target_commits[:-1]] | ||||
assert source_ref.commit_id in commit_ids | ||||
assert target_ref.commit_id in commit_ids | ||||
merge_commit = target_commits[-1] | ||||
Martin Bornhold
|
r1053 | assert merge_commit.raw_id == merge_response.merge_ref.commit_id | ||
r5607 | assert merge_commit.message.strip() == "merge message 1" | |||
assert merge_commit.author == "test user <test@rhodecode.com>" | ||||
r1 | ||||
# Check the bookmark was updated in the target repo | ||||
r5607 | assert target_repo.bookmarks[bookmark_name] == merge_response.merge_ref.commit_id | |||
r1 | ||||
def test_merge_source_is_bookmark(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
imc = source_repo.in_memory_commit | ||||
r5607 | imc.add(FileNode(b"file_x", content=source_repo.name)) | |||
imc.commit(message="Automatic commit from repo merge test", author="Automatic <automatic@rhodecode.com>") | ||||
r1 | target_commit = target_repo.get_commit() | |||
source_commit = source_repo.get_commit() | ||||
default_branch = target_repo.DEFAULT_BRANCH_NAME | ||||
r5607 | bookmark_name = "bookmark" | |||
target_ref = Reference("branch", default_branch, target_commit.raw_id) | ||||
r1 | source_repo._update(default_branch) | |||
source_repo.bookmark(bookmark_name) | ||||
r5607 | source_ref = Reference("book", bookmark_name, source_commit.raw_id) | |||
workspace_id = "test-merge" | ||||
r2810 | repo_id = repo_id_generator(target_repo.path) | |||
r1 | merge_response = target_repo.merge( | |||
r5607 | repo_id, | |||
workspace_id, | ||||
target_ref, | ||||
source_repo, | ||||
source_ref, | ||||
"test user", | ||||
"test@rhodecode.com", | ||||
"merge message 1", | ||||
dry_run=False, | ||||
) | ||||
expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE) | ||||
r1 | assert merge_response == expected_merge_response | |||
r5607 | target_repo = backends.get_backend(vcsbackend_hg.alias)(target_repo.path) | |||
r1 | target_commits = list(target_repo.get_commits()) | |||
commit_ids = [c.raw_id for c in target_commits] | ||||
assert source_ref.commit_id == commit_ids[-1] | ||||
assert target_ref.commit_id == commit_ids[-2] | ||||
def test_merge_target_has_multiple_heads(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=2) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
r5607 | vcsbackend_hg.add_file(target_repo, "README_MERGE1", "Version 1") | |||
vcsbackend_hg.add_file(source_repo, "README_MERGE2", "Version 2") | ||||
r1 | ||||
# add an extra head to the target repo | ||||
imc = target_repo.in_memory_commit | ||||
r5647 | imc.add(FileNode(b"file_x", content=b"foo")) | |||
r1 | commits = list(target_repo.get_commits()) | |||
imc.commit( | ||||
r5607 | message="Automatic commit from repo merge test", | |||
author="Automatic <automatic@rhodecode.com>", | ||||
parents=commits[0:1], | ||||
) | ||||
r1 | ||||
target_commit = target_repo.get_commit() | ||||
source_commit = source_repo.get_commit() | ||||
default_branch = target_repo.DEFAULT_BRANCH_NAME | ||||
target_repo._update(default_branch) | ||||
r5607 | target_ref = Reference("branch", default_branch, target_commit.raw_id) | |||
source_ref = Reference("branch", default_branch, source_commit.raw_id) | ||||
workspace_id = "test-merge" | ||||
r1 | ||||
r5607 | assert len(target_repo._heads(branch="default")) == 2 | |||
heads = target_repo._heads(branch="default") | ||||
r1 | expected_merge_response = MergeResponse( | |||
r5607 | False, False, None, MergeFailureReason.HG_TARGET_HAS_MULTIPLE_HEADS, metadata={"heads": heads} | |||
) | ||||
r2810 | repo_id = repo_id_generator(target_repo.path) | |||
r1 | merge_response = target_repo.merge( | |||
r5607 | repo_id, | |||
workspace_id, | ||||
target_ref, | ||||
source_repo, | ||||
source_ref, | ||||
"test user", | ||||
"test@rhodecode.com", | ||||
"merge message 1", | ||||
dry_run=False, | ||||
) | ||||
r1 | assert merge_response == expected_merge_response | |||
def test_merge_rebase_source_is_updated_bookmark(self, vcsbackend_hg): | ||||
target_repo = vcsbackend_hg.create_repo(number_of_commits=1) | ||||
source_repo = vcsbackend_hg.clone_repo(target_repo) | ||||
r5607 | vcsbackend_hg.add_file(target_repo, b"README_MERGE1", b"Version 1") | |||
vcsbackend_hg.add_file(source_repo, b"README_MERGE2", b"Version 2") | ||||
r5087 | ||||
r1 | imc = source_repo.in_memory_commit | |||
r5607 | imc.add(FileNode(b"file_x", content=safe_bytes(source_repo.name))) | |||
imc.commit(message="Automatic commit from repo merge test", author="Automatic <automatic@rhodecode.com>") | ||||
r5087 | ||||
r1 | target_commit = target_repo.get_commit() | |||
source_commit = source_repo.get_commit() | ||||
r5607 | vcsbackend_hg.add_file(source_repo, b"LICENSE", b"LICENSE Info") | |||
r1 | ||||
default_branch = target_repo.DEFAULT_BRANCH_NAME | ||||
r5607 | bookmark_name = "bookmark" | |||
r1 | source_repo._update(default_branch) | |||
source_repo.bookmark(bookmark_name) | ||||
r5607 | target_ref = Reference("branch", default_branch, target_commit.raw_id) | |||
source_ref = Reference("book", bookmark_name, source_commit.raw_id) | ||||
r2810 | repo_id = repo_id_generator(target_repo.path) | |||
r5607 | workspace_id = "test-merge" | |||
r1 | ||||
Martin Bornhold
|
r363 | merge_response = target_repo.merge( | ||
r5607 | repo_id, | |||
workspace_id, | ||||
target_ref, | ||||
source_repo, | ||||
source_ref, | ||||
"test user", | ||||
"test@rhodecode.com", | ||||
"merge message 1", | ||||
dry_run=False, | ||||
use_rebase=True, | ||||
) | ||||
r1 | ||||
r5607 | expected_merge_response = MergeResponse(True, True, merge_response.merge_ref, MergeFailureReason.NONE) | |||
r1 | assert merge_response == expected_merge_response | |||
r5607 | target_repo = backends.get_backend(vcsbackend_hg.alias)(target_repo.path) | |||
r1 | last_commit = target_repo.get_commit() | |||
assert last_commit.message == source_commit.message | ||||
assert last_commit.author == source_commit.author | ||||
# This checks that we effectively did a rebase | ||||
assert last_commit.raw_id != source_commit.raw_id | ||||
# Check the target has only 4 commits: 2 were already in target and | ||||
# only two should have been added | ||||
assert len(target_repo.commit_ids) == 2 + 2 | ||||
r5647 | class TestGetShadowInstance: | |||
r3946 | @pytest.fixture() | |||
r5607 | def repo(self, vcsbackend_hg): | |||
_hg_repo = vcsbackend_hg.repo | ||||
connection_mock = mock.Mock(unsafe=True, name="connection.Hg") | ||||
mock.patch("rhodecode.lib.vcs.connection.Hg", connection_mock) | ||||
return _hg_repo | ||||
r1 | ||||
r5607 | def test_getting_shadow_instance_copies_config(self, repo): | |||
r3848 | shadow = repo.get_shadow_instance(repo.path) | |||
r5607 | assert shadow.config.serialize() == repo.config.serialize() | |||
r1 | ||||
r5607 | def test_disables_hooks_section(self, repo): | |||
r5647 | repo.config.set("hooks", "foo", "val") | |||
r3848 | shadow = repo.get_shadow_instance(repo.path) | |||
r5647 | assert not shadow.config.items("hooks") | |||
r1 | ||||
def test_allows_to_keep_hooks(self, repo): | ||||
r5647 | repo.config.set("hooks", "foo", "val") | |||
r3848 | shadow = repo.get_shadow_instance(repo.path, enable_hooks=True) | |||
r5647 | assert shadow.config.items("hooks") | |||
r1 | ||||
r5647 | class TestMercurialCommit: | |||
@pytest.fixture(autouse=True) | ||||
def prepare(self): | ||||
self.repo = MercurialRepository(TEST_HG_REPO) | ||||
r1 | def _test_equality(self, commit): | |||
idx = commit.idx | ||||
assert commit == self.repo.get_commit(commit_idx=idx) | ||||
def test_equality(self): | ||||
indexes = [0, 10, 20] | ||||
commits = [self.repo.get_commit(commit_idx=idx) for idx in indexes] | ||||
for commit in commits: | ||||
self._test_equality(commit) | ||||
def test_default_commit(self): | ||||
r5607 | tip = self.repo.get_commit("tip") | |||
r1 | assert tip == self.repo.get_commit() | |||
assert tip == self.repo.get_commit(commit_id=None) | ||||
assert tip == self.repo.get_commit(commit_idx=None) | ||||
assert tip == list(self.repo[-1:])[0] | ||||
def test_root_node(self): | ||||
r5607 | tip = self.repo.get_commit("tip") | |||
r5647 | assert tip.root is tip.get_node(b"") | |||
r1 | ||||
def test_lazy_fetch(self): | ||||
""" | ||||
Test if commit's nodes expands and are cached as we walk through | ||||
the commit. This test is somewhat hard to write as order of tests | ||||
is a key here. Written by running command after command in a shell. | ||||
""" | ||||
commit = self.repo.get_commit(commit_idx=45) | ||||
assert len(commit.nodes) == 0 | ||||
root = commit.root | ||||
assert len(commit.nodes) == 1 | ||||
assert len(root.nodes) == 8 | ||||
# accessing root.nodes updates commit.nodes | ||||
assert len(commit.nodes) == 9 | ||||
r5647 | docs = commit.get_node(b"docs") | |||
r1 | # we haven't yet accessed anything new as docs dir was already cached | |||
assert len(commit.nodes) == 9 | ||||
assert len(docs.nodes) == 8 | ||||
# accessing docs.nodes updates commit.nodes | ||||
assert len(commit.nodes) == 17 | ||||
r5647 | assert docs is commit.get_node(b"docs") | |||
r1 | assert docs is root.nodes[0] | |||
assert docs is root.dirs[0] | ||||
r5647 | assert docs is commit.get_node(b"docs") | |||
r1 | ||||
def test_nodes_with_commit(self): | ||||
commit = self.repo.get_commit(commit_idx=45) | ||||
root = commit.root | ||||
r5647 | assert isinstance(root, RootNode) | |||
docs = commit.get_node(b"docs") | ||||
assert docs is commit.get_node(b"docs") | ||||
api = commit.get_node(b"docs/api") | ||||
assert api is commit.get_node(b"docs/api") | ||||
index = commit.get_node(b"docs/api/index.rst") | ||||
assert index is commit.get_node(b"docs/api/index.rst") | ||||
r1 | ||||
def test_branch_and_tags(self): | ||||
commit0 = self.repo.get_commit(commit_idx=0) | ||||
r5607 | assert commit0.branch == "default" | |||
r1 | assert commit0.tags == [] | |||
commit10 = self.repo.get_commit(commit_idx=10) | ||||
r5607 | assert commit10.branch == "default" | |||
r1 | assert commit10.tags == [] | |||
commit44 = self.repo.get_commit(commit_idx=44) | ||||
r5607 | assert commit44.branch == "web" | |||
r1 | ||||
r5607 | tip = self.repo.get_commit("tip") | |||
assert "tip" in tip.tags | ||||
r1 | ||||
def test_bookmarks(self): | ||||
commit0 = self.repo.get_commit(commit_idx=0) | ||||
assert commit0.bookmarks == [] | ||||
def _test_file_size(self, idx, path, size): | ||||
node = self.repo.get_commit(commit_idx=idx).get_node(path) | ||||
assert node.is_file() | ||||
assert node.size == size | ||||
def test_file_size(self): | ||||
to_check = ( | ||||
r5647 | (10, b"setup.py", 1068), | |||
(20, b"setup.py", 1106), | ||||
(60, b"setup.py", 1074), | ||||
(10, b"vcs/backends/base.py", 2921), | ||||
(20, b"vcs/backends/base.py", 3936), | ||||
(60, b"vcs/backends/base.py", 6189), | ||||
r1 | ) | |||
for idx, path, size in to_check: | ||||
self._test_file_size(idx, path, size) | ||||
def test_file_history_from_commits(self): | ||||
r5647 | node = self.repo[10].get_node(b"setup.py") | |||
r1 | commit_ids = [commit.raw_id for commit in node.history] | |||
r5607 | assert ["3803844fdbd3b711175fc3da9bdacfcd6d29a6fb"] == commit_ids | |||
r1 | ||||
r5647 | node = self.repo[20].get_node(b"setup.py") | |||
r1 | node_ids = [commit.raw_id for commit in node.history] | |||
r5607 | assert ["eada5a770da98ab0dd7325e29d00e0714f228d09", "3803844fdbd3b711175fc3da9bdacfcd6d29a6fb"] == node_ids | |||
r1 | ||||
# special case we check history from commit that has this particular | ||||
# file changed this means we check if it's included as well | ||||
r5647 | node = self.repo.get_commit("eada5a770da98ab0dd7325e29d00e0714f228d09").get_node(b"setup.py") | |||
r1 | node_ids = [commit.raw_id for commit in node.history] | |||
r5607 | assert ["eada5a770da98ab0dd7325e29d00e0714f228d09", "3803844fdbd3b711175fc3da9bdacfcd6d29a6fb"] == node_ids | |||
r1 | ||||
def test_file_history(self): | ||||
# we can only check if those commits are present in the history | ||||
# as we cannot update this test every time file is changed | ||||
files = { | ||||
r5647 | b"setup.py": [7, 18, 45, 46, 47, 69, 77], | |||
b"vcs/nodes.py": [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60, 61, 73, 76], | ||||
b"vcs/backends/hg.py": [ | ||||
r5607 | 4, | |||
5, | ||||
6, | ||||
11, | ||||
12, | ||||
13, | ||||
14, | ||||
15, | ||||
16, | ||||
21, | ||||
22, | ||||
23, | ||||
26, | ||||
27, | ||||
28, | ||||
30, | ||||
31, | ||||
33, | ||||
35, | ||||
36, | ||||
37, | ||||
38, | ||||
39, | ||||
40, | ||||
41, | ||||
44, | ||||
45, | ||||
47, | ||||
48, | ||||
49, | ||||
53, | ||||
54, | ||||
55, | ||||
58, | ||||
60, | ||||
61, | ||||
67, | ||||
68, | ||||
69, | ||||
70, | ||||
73, | ||||
77, | ||||
78, | ||||
79, | ||||
82, | ||||
], | ||||
r1 | } | |||
for path, indexes in files.items(): | ||||
tip = self.repo.get_commit(commit_idx=indexes[-1]) | ||||
node = tip.get_node(path) | ||||
node_indexes = [commit.idx for commit in node.history] | ||||
assert set(indexes).issubset(set(node_indexes)), ( | ||||
"We assumed that %s is subset of commits for which file %s " | ||||
r5607 | "has been changed, and history of that node returned: %s" % (indexes, path, node_indexes) | |||
) | ||||
r1 | ||||
def test_file_annotate(self): | ||||
files = { | ||||
r5647 | b"vcs/backends/__init__.py": { | |||
r1 | 89: { | |||
r5607 | "lines_no": 31, | |||
"commits": [ | ||||
32, | ||||
32, | ||||
61, | ||||
32, | ||||
32, | ||||
37, | ||||
32, | ||||
32, | ||||
32, | ||||
44, | ||||
37, | ||||
37, | ||||
37, | ||||
37, | ||||
45, | ||||
37, | ||||
44, | ||||
37, | ||||
37, | ||||
37, | ||||
32, | ||||
32, | ||||
32, | ||||
32, | ||||
37, | ||||
32, | ||||
37, | ||||
37, | ||||
32, | ||||
32, | ||||
32, | ||||
], | ||||
}, | ||||
20: {"lines_no": 1, "commits": [4]}, | ||||
55: { | ||||
"lines_no": 31, | ||||
"commits": [ | ||||
32, | ||||
32, | ||||
45, | ||||
32, | ||||
32, | ||||
37, | ||||
32, | ||||
32, | ||||
32, | ||||
44, | ||||
37, | ||||
37, | ||||
37, | ||||
37, | ||||
45, | ||||
37, | ||||
44, | ||||
37, | ||||
37, | ||||
37, | ||||
32, | ||||
32, | ||||
32, | ||||
32, | ||||
37, | ||||
32, | ||||
37, | ||||
37, | ||||
32, | ||||
32, | ||||
32, | ||||
], | ||||
}, | ||||
}, | ||||
r5647 | b"vcs/exceptions.py": { | |||
r5607 | 89: { | |||
"lines_no": 18, | ||||
"commits": [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 16, 16, 18, 18, 18], | ||||
r1 | }, | |||
20: { | ||||
r5607 | "lines_no": 18, | |||
"commits": [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 16, 16, 18, 18, 18], | ||||
r1 | }, | |||
55: { | ||||
r5607 | "lines_no": 18, | |||
"commits": [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 17, 16, 16, 18, 18, 18], | ||||
r1 | }, | |||
}, | ||||
r5647 | b"MANIFEST.in": { | |||
r5607 | 89: {"lines_no": 5, "commits": [7, 7, 7, 71, 71]}, | |||
20: {"lines_no": 3, "commits": [7, 7, 7]}, | ||||
55: {"lines_no": 3, "commits": [7, 7, 7]}, | ||||
}, | ||||
r1 | } | |||
r5647 | for file_name, commit_dict in files.items(): | |||
r1 | for idx, __ in commit_dict.items(): | |||
commit = self.repo.get_commit(commit_idx=idx) | ||||
r5647 | l1_1 = [x[1] for x in commit.get_file_annotate(file_name)] | |||
l1_2 = [x[2]().raw_id for x in commit.get_file_annotate(file_name)] | ||||
r1 | assert l1_1 == l1_2 | |||
r5647 | l1 = l1_2 = [x[2]().idx for x in commit.get_file_annotate(file_name)] | |||
l2 = files[file_name][idx]["commits"] | ||||
r1 | assert l1 == l2, ( | |||
"The lists of commit for %s@commit_id%s" | ||||
"from annotation list should match each other," | ||||
r5647 | "got \n%s \nvs \n%s " % (file_name, idx, l1, l2) | |||
r5607 | ) | |||
r1 | ||||
def test_commit_state(self): | ||||
""" | ||||
Tests which files have been added/changed/removed at particular commit | ||||
""" | ||||
# commit_id 46ad32a4f974: | ||||
# hg st --rev 46ad32a4f974 | ||||
# changed: 13 | ||||
# added: 20 | ||||
# removed: 1 | ||||
r5647 | changed = { | |||
b".hgignore", | ||||
b"README.rst", | ||||
b"docs/conf.py", | ||||
b"docs/index.rst", | ||||
b"setup.py", | ||||
b"tests/test_hg.py", | ||||
b"tests/test_nodes.py", | ||||
b"vcs/__init__.py", | ||||
b"vcs/backends/__init__.py", | ||||
b"vcs/backends/base.py", | ||||
b"vcs/backends/hg.py", | ||||
b"vcs/nodes.py", | ||||
b"vcs/utils/__init__.py", | ||||
} | ||||
r1 | ||||
r5647 | added = { | |||
b"docs/api/backends/hg.rst", | ||||
b"docs/api/backends/index.rst", | ||||
b"docs/api/index.rst", | ||||
b"docs/api/nodes.rst", | ||||
b"docs/api/web/index.rst", | ||||
b"docs/api/web/simplevcs.rst", | ||||
b"docs/installation.rst", | ||||
b"docs/quickstart.rst", | ||||
b"setup.cfg", | ||||
b"vcs/utils/baseui_config.py", | ||||
b"vcs/utils/web.py", | ||||
b"vcs/web/__init__.py", | ||||
b"vcs/web/exceptions.py", | ||||
b"vcs/web/simplevcs/__init__.py", | ||||
b"vcs/web/simplevcs/exceptions.py", | ||||
b"vcs/web/simplevcs/middleware.py", | ||||
b"vcs/web/simplevcs/models.py", | ||||
b"vcs/web/simplevcs/settings.py", | ||||
b"vcs/web/simplevcs/utils.py", | ||||
b"vcs/web/simplevcs/views.py", | ||||
} | ||||
r1 | ||||
r5647 | removed = {b"docs/api.rst"} | |||
r1 | ||||
r5607 | commit64 = self.repo.get_commit("46ad32a4f974") | |||
r5647 | assert set((node for node in commit64.added_paths)) == added | |||
assert set((node for node in commit64.changed_paths)) == changed | ||||
assert set((node for node in commit64.removed_paths)) == removed | ||||
r1 | ||||
# commit_id b090f22d27d6: | ||||
# hg st --rev b090f22d27d6 | ||||
# changed: 13 | ||||
# added: 20 | ||||
# removed: 1 | ||||
r5607 | commit88 = self.repo.get_commit("b090f22d27d6") | |||
r5647 | assert set((node for node in commit88.added_paths)) == set() | |||
assert set((node for node in commit88.changed_paths)) == {b".hgignore"} | ||||
assert set((node for node in commit88.removed_paths)) == set() | ||||
r1 | ||||
# | ||||
# 85: | ||||
# added: 2 [ | ||||
# 'vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py'] | ||||
# changed: 4 ['vcs/web/simplevcs/models.py', ...] | ||||
# removed: 1 ['vcs/utils/web.py'] | ||||
commit85 = self.repo.get_commit(commit_idx=85) | ||||
r5647 | assert set((node for node in commit85.added_paths)) == {b"vcs/utils/diffs.py", b"vcs/web/simplevcs/views/diffs.py"} | |||
assert set((node for node in commit85.changed_paths)) == { | ||||
b"vcs/web/simplevcs/models.py", | ||||
b"vcs/web/simplevcs/utils.py", | ||||
b"vcs/web/simplevcs/views/__init__.py", | ||||
b"vcs/web/simplevcs/views/repository.py", | ||||
} | ||||
assert set((node for node in commit85.removed_paths)) == {b"vcs/utils/web.py"} | ||||
r1 | ||||
def test_files_state(self): | ||||
""" | ||||
Tests state of FileNodes. | ||||
""" | ||||
commit = self.repo.get_commit(commit_idx=85) | ||||
r5647 | node = commit.get_node(b"vcs/utils/diffs.py") | |||
assert node.bytes_path in commit.added_paths | ||||
r1 | ||||
commit = self.repo.get_commit(commit_idx=88) | ||||
r5647 | node = commit.get_node(b".hgignore") | |||
assert node.bytes_path in commit.changed_paths | ||||
r1 | ||||
commit = self.repo.get_commit(commit_idx=85) | ||||
r5647 | node = commit.get_node(b"setup.py") | |||
assert node.bytes_path not in commit.affected_files | ||||
r1 | ||||
# If node has REMOVED state then trying to fetch it would raise | ||||
# CommitError exception | ||||
commit = self.repo.get_commit(commit_idx=2) | ||||
r5647 | path = b"vcs/backends/BaseRepository.py" | |||
r1 | with pytest.raises(NodeDoesNotExistError): | |||
commit.get_node(path) | ||||
r5647 | ||||
r1 | # but it would be one of ``removed`` (commit's attribute) | |||
r5647 | assert path in [rf for rf in commit.removed_paths] | |||
r1 | ||||
def test_commit_message_is_unicode(self): | ||||
for cm in self.repo: | ||||
r4952 | assert type(cm.message) == str | |||
r1 | ||||
def test_commit_author_is_unicode(self): | ||||
for cm in self.repo: | ||||
r4952 | assert type(cm.author) == str | |||
r1 | ||||
r5087 | def test_repo_files_content_type(self): | |||
r1 | test_commit = self.repo.get_commit(commit_idx=100) | |||
r5647 | for node in test_commit.get_node(b"/"): | |||
r1 | if node.is_file(): | |||
r5087 | assert type(node.content) == bytes | |||
assert type(node.str_content) == str | ||||
r1 | ||||
def test_wrong_path(self): | ||||
# There is 'setup.py' in the root dir but not there: | ||||
r5647 | path = b"foo/bar/setup.py" | |||
r1 | with pytest.raises(VCSError): | |||
self.repo.get_commit().get_node(path) | ||||
def test_author_email(self): | ||||
r5607 | assert "marcin@python-blog.com" == self.repo.get_commit("b986218ba1c9").author_email | |||
assert "lukasz.balcerzak@python-center.pl" == self.repo.get_commit("3803844fdbd3").author_email | ||||
assert "" == self.repo.get_commit("84478366594b").author_email | ||||
r1 | ||||
def test_author_username(self): | ||||
r5607 | assert "Marcin Kuzminski" == self.repo.get_commit("b986218ba1c9").author_name | |||
assert "Lukasz Balcerzak" == self.repo.get_commit("3803844fdbd3").author_name | ||||
assert "marcink" == self.repo.get_commit("84478366594b").author_name | ||||
r1 | ||||
r5647 | class TestLargeFileRepo: | |||
r1577 | def test_large_file(self, backend_hg): | |||
r5607 | conf = make_db_config() | |||
hg_largefiles_store = conf.get("largefiles", "usercache") | ||||
repo = backend_hg.create_test_repo("largefiles", conf) | ||||
r1577 | ||||
tip = repo.scm_instance().get_commit() | ||||
r5647 | node = tip.get_node(b".hglf/thisfileislarge") | |||
r1577 | ||||
lf_node = node.get_largefile_node() | ||||
assert lf_node.is_largefile() is True | ||||
assert lf_node.size == 1024000 | ||||
r5647 | assert lf_node.name == b".hglf/thisfileislarge" | |||
r1577 | ||||
r5647 | class TestGetBranchName: | |||
@pytest.fixture(autouse=True) | ||||
def prepare(self): | ||||
self.repo = MercurialRepository(TEST_HG_REPO) | ||||
r1 | def test_returns_ref_name_when_type_is_branch(self): | |||
r5607 | ref = self._create_ref("branch", "fake-name") | |||
r1 | result = self.repo._get_branch_name(ref) | |||
assert result == ref.name | ||||
@pytest.mark.parametrize("type_", ("book", "tag")) | ||||
def test_queries_remote_when_type_is_not_branch(self, type_): | ||||
r5607 | ref = self._create_ref(type_, "wrong-fake-name") | |||
r1 | with mock.patch.object(self.repo, "_remote") as remote_mock: | |||
remote_mock.ctx_branch.return_value = "fake-name" | ||||
result = self.repo._get_branch_name(ref) | ||||
assert result == "fake-name" | ||||
remote_mock.ctx_branch.assert_called_once_with(ref.commit_id) | ||||
def _create_ref(self, type_, name): | ||||
ref = mock.Mock() | ||||
ref.type = type_ | ||||
r5607 | ref.name = "wrong-fake-name" | |||
r1 | ref.commit_id = "deadbeef" | |||
return ref | ||||
r5647 | class TestIsTheSameBranch: | |||
@pytest.fixture(autouse=True) | ||||
def prepare(self): | ||||
self.repo = MercurialRepository(TEST_HG_REPO) | ||||
r1 | def test_returns_true_when_branches_are_equal(self): | |||
source_ref = mock.Mock(name="source-ref") | ||||
target_ref = mock.Mock(name="target-ref") | ||||
r5607 | branch_name_patcher = mock.patch.object(self.repo, "_get_branch_name", return_value="default") | |||
r1 | with branch_name_patcher as branch_name_mock: | |||
result = self.repo._is_the_same_branch(source_ref, target_ref) | ||||
expected_calls = [mock.call(source_ref), mock.call(target_ref)] | ||||
assert branch_name_mock.call_args_list == expected_calls | ||||
assert result is True | ||||
def test_returns_false_when_branches_are_not_equal(self): | ||||
source_ref = mock.Mock(name="source-ref") | ||||
source_ref.name = "source-branch" | ||||
target_ref = mock.Mock(name="target-ref") | ||||
source_ref.name = "target-branch" | ||||
def side_effect(ref): | ||||
return ref.name | ||||
r5607 | branch_name_patcher = mock.patch.object(self.repo, "_get_branch_name", side_effect=side_effect) | |||
r1 | with branch_name_patcher as branch_name_mock: | |||
result = self.repo._is_the_same_branch(source_ref, target_ref) | ||||
expected_calls = [mock.call(source_ref), mock.call(target_ref)] | ||||
assert branch_name_mock.call_args_list == expected_calls | ||||
assert result is False | ||||