# Copyright (C) 2010-2020 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import datetime import time import pytest from rhodecode.lib.str_utils import safe_bytes from rhodecode.lib.vcs.backends.base import ( CollectionGenerator, FILEMODE_DEFAULT, EmptyCommit) from rhodecode.lib.vcs.exceptions import ( BranchDoesNotExistError, CommitDoesNotExistError, RepositoryError, EmptyRepositoryError) from rhodecode.lib.vcs.nodes import ( FileNode, AddedFileNodesGenerator, ChangedFileNodesGenerator, RemovedFileNodesGenerator) from rhodecode.tests import get_new_dir from rhodecode.tests.vcs.conftest import BackendTestMixin class TestBaseChangeset(object): def test_is_deprecated(self): from rhodecode.lib.vcs.backends.base import BaseChangeset pytest.deprecated_call(BaseChangeset) class TestEmptyCommit(object): def test_branch_without_alias_returns_none(self): commit = EmptyCommit() assert commit.branch is None @pytest.mark.usefixtures("vcs_repository_support") class TestCommitsInNonEmptyRepo(BackendTestMixin): recreate_repo_per_test = True @classmethod def _get_commits(cls): start_date = datetime.datetime(2010, 1, 1, 20) for x in range(5): yield { 'message': 'Commit %d' % x, 'author': 'Joe Doe ', 'date': start_date + datetime.timedelta(hours=12 * x), 'added': [ FileNode(b'file_%d.txt' % x, content=b'Foobar %d' % x), ], } def test_walk_returns_empty_list_in_case_of_file(self): result = list(self.tip.walk('file_0.txt')) assert result == [] @pytest.mark.backends("git", "hg") def test_new_branch(self): self.imc.add(FileNode(b'docs/index.txt', content=b'Documentation\n')) foobar_tip = self.imc.commit( message='New branch: foobar', author='joe ', branch='foobar', ) assert 'foobar' in self.repo.branches assert foobar_tip.branch == 'foobar' # 'foobar' should be the only branch that contains the new commit branch = list(self.repo.branches.values()) assert branch[0] != branch[1] @pytest.mark.backends("git", "hg") def test_new_head_in_default_branch(self): tip = self.repo.get_commit() self.imc.add(FileNode(b'docs/index.txt', content=b'Documentation\n')) foobar_tip = self.imc.commit( message='New branch: foobar', author='joe ', branch='foobar', parents=[tip], ) self.imc.change(FileNode(b'docs/index.txt', content=b'Documentation\nand more...\n')) newtip = self.imc.commit( message='At default branch', author='joe ', branch=foobar_tip.branch, parents=[foobar_tip], ) newest_tip = self.imc.commit( message='Merged with %s' % foobar_tip.raw_id, author='joe ', branch=self.backend_class.DEFAULT_BRANCH_NAME, parents=[newtip, foobar_tip], ) assert newest_tip.branch == self.backend_class.DEFAULT_BRANCH_NAME @pytest.mark.backends("git", "hg") def test_get_commits_respects_branch_name(self): """ * e1930d0 (HEAD, master) Back in default branch | * e1930d0 (docs) New Branch: docs2 | * dcc14fa New branch: docs |/ * e63c41a Initial commit ... * 624d3db Commit 0 :return: """ DEFAULT_BRANCH = self.repo.DEFAULT_BRANCH_NAME TEST_BRANCH = 'docs' org_tip = self.repo.get_commit() self.imc.add(FileNode(b'readme.txt', content=b'Document\n')) initial = self.imc.commit( message='Initial commit', author='joe ', parents=[org_tip], branch=DEFAULT_BRANCH,) self.imc.add(FileNode(b'newdoc.txt', content=b'foobar\n')) docs_branch_commit1 = self.imc.commit( message='New branch: docs', author='joe ', parents=[initial], branch=TEST_BRANCH,) self.imc.add(FileNode(b'newdoc2.txt', content=b'foobar2\n')) docs_branch_commit2 = self.imc.commit( message='New branch: docs2', author='joe ', parents=[docs_branch_commit1], branch=TEST_BRANCH,) self.imc.add(FileNode(b'newfile', content=b'hello world\n')) self.imc.commit( message='Back in default branch', author='joe ', parents=[initial], branch=DEFAULT_BRANCH,) default_branch_commits = self.repo.get_commits(branch_name=DEFAULT_BRANCH) assert docs_branch_commit1 not in list(default_branch_commits) assert docs_branch_commit2 not in list(default_branch_commits) docs_branch_commits = self.repo.get_commits( start_id=self.repo.commit_ids[0], end_id=self.repo.commit_ids[-1], branch_name=TEST_BRANCH) assert docs_branch_commit1 in list(docs_branch_commits) assert docs_branch_commit2 in list(docs_branch_commits) @pytest.mark.backends("svn") def test_get_commits_respects_branch_name_svn(self, vcsbackend_svn): repo = vcsbackend_svn['svn-simple-layout'] commits = repo.get_commits(branch_name='trunk') commit_indexes = [c.idx for c in commits] assert commit_indexes == [1, 2, 3, 7, 12, 15] def test_get_commit_by_index(self): for idx in [1, 2, 3, 4]: assert idx == self.repo.get_commit(commit_idx=idx).idx def test_get_commit_by_branch(self): for branch, commit_id in self.repo.branches.items(): assert commit_id == self.repo.get_commit(branch).raw_id def test_get_commit_by_tag(self): for tag, commit_id in self.repo.tags.items(): assert commit_id == self.repo.get_commit(tag).raw_id def test_get_commit_parents(self): repo = self.repo for test_idx in [1, 2, 3]: commit = repo.get_commit(commit_idx=test_idx - 1) assert [commit] == repo.get_commit(commit_idx=test_idx).parents def test_get_commit_children(self): repo = self.repo for test_idx in [1, 2, 3]: commit = repo.get_commit(commit_idx=test_idx + 1) assert [commit] == repo.get_commit(commit_idx=test_idx).children @pytest.mark.usefixtures("vcs_repository_support") class TestCommits(BackendTestMixin): recreate_repo_per_test = False @classmethod def _get_commits(cls): start_date = datetime.datetime(2010, 1, 1, 20) for x in range(5): yield { 'message': 'Commit %d' % x, 'author': 'Joe Doe ', 'date': start_date + datetime.timedelta(hours=12 * x), 'added': [ FileNode(b'file_%d.txt' % x, content=b'Foobar %d' % x) ], } def test_simple(self): tip = self.repo.get_commit() assert tip.date, datetime.datetime(2010, 1, 3 == 20) def test_simple_serialized_commit(self): tip = self.repo.get_commit() # json.dumps(tip) uses .__json__() method data = tip.__json__() assert 'branch' in data assert data['revision'] def test_retrieve_tip(self): tip = self.repo.get_commit('tip') assert tip == self.repo.get_commit() def test_invalid(self): with pytest.raises(CommitDoesNotExistError): self.repo.get_commit(commit_idx=123456789) def test_idx(self): commit = self.repo[0] assert commit.idx == 0 def test_negative_idx(self): commit = self.repo.get_commit(commit_idx=-1) assert commit.idx >= 0 def test_revision_is_deprecated(self): def get_revision(commit): return commit.revision commit = self.repo[0] pytest.deprecated_call(get_revision, commit) def test_size(self): tip = self.repo.get_commit() size = 5 * len('Foobar N') # Size of 5 files assert tip.size == size def test_size_at_commit(self): tip = self.repo.get_commit() size = 5 * len('Foobar N') # Size of 5 files assert self.repo.size_at_commit(tip.raw_id) == size def test_size_at_first_commit(self): commit = self.repo[0] size = len('Foobar N') # Size of 1 file assert self.repo.size_at_commit(commit.raw_id) == size def test_author(self): tip = self.repo.get_commit() assert_text_equal(tip.author, 'Joe Doe ') def test_author_name(self): tip = self.repo.get_commit() assert_text_equal(tip.author_name, 'Joe Doe') def test_author_email(self): tip = self.repo.get_commit() assert_text_equal(tip.author_email, 'joe.doe@example.com') def test_message(self): tip = self.repo.get_commit() assert_text_equal(tip.message, 'Commit 4') def test_diff(self): tip = self.repo.get_commit() diff = tip.diff() assert b"+Foobar 4" in diff.raw.tobytes() def test_prev(self): tip = self.repo.get_commit() prev_commit = tip.prev() assert prev_commit.message == 'Commit 3' def test_prev_raises_on_first_commit(self): commit = self.repo.get_commit(commit_idx=0) with pytest.raises(CommitDoesNotExistError): commit.prev() def test_prev_works_on_second_commit_issue_183(self): commit = self.repo.get_commit(commit_idx=1) prev_commit = commit.prev() assert prev_commit.idx == 0 def test_next(self): commit = self.repo.get_commit(commit_idx=2) next_commit = commit.next() assert next_commit.message == 'Commit 3' def test_next_raises_on_tip(self): commit = self.repo.get_commit() with pytest.raises(CommitDoesNotExistError): commit.next() def test_get_path_commit(self): commit = self.repo.get_commit() commit.get_path_commit('file_4.txt') assert commit.message == 'Commit 4' def test_get_filenodes_generator(self): tip = self.repo.get_commit() filepaths = [node.path for node in tip.get_filenodes_generator()] assert filepaths == ['file_%d.txt' % x for x in range(5)] def test_get_file_annotate(self): file_added_commit = self.repo.get_commit(commit_idx=3) annotations = list(file_added_commit.get_file_annotate('file_3.txt')) line_no, commit_id, commit_loader, line = annotations[0] assert line_no == 1 assert commit_id == file_added_commit.raw_id assert commit_loader() == file_added_commit assert 'Foobar 3' in line def test_get_file_annotate_does_not_exist(self): file_added_commit = self.repo.get_commit(commit_idx=2) # TODO: Should use a specific exception class here? with pytest.raises(Exception): list(file_added_commit.get_file_annotate('file_3.txt')) def test_get_file_annotate_tip(self): tip = self.repo.get_commit() commit = self.repo.get_commit(commit_idx=3) expected_values = list(commit.get_file_annotate('file_3.txt')) annotations = list(tip.get_file_annotate('file_3.txt')) # Note: Skip index 2 because the loader function is not the same for idx in (0, 1, 3): assert annotations[0][idx] == expected_values[0][idx] def test_get_commits_is_ordered_by_date(self): commits = self.repo.get_commits() assert isinstance(commits, CollectionGenerator) assert len(commits) == 0 or len(commits) != 0 commits = list(commits) ordered_by_date = sorted(commits, key=lambda commit: commit.date) assert commits == ordered_by_date def test_get_commits_respects_start(self): second_id = self.repo.commit_ids[1] commits = self.repo.get_commits(start_id=second_id) assert isinstance(commits, CollectionGenerator) commits = list(commits) assert len(commits) == 4 def test_get_commits_includes_start_commit(self): second_id = self.repo.commit_ids[1] commits = self.repo.get_commits(start_id=second_id) assert isinstance(commits, CollectionGenerator) commits = list(commits) assert commits[0].raw_id == second_id def test_get_commits_respects_end(self): second_id = self.repo.commit_ids[1] commits = self.repo.get_commits(end_id=second_id) assert isinstance(commits, CollectionGenerator) commits = list(commits) assert commits[-1].raw_id == second_id assert len(commits) == 2 def test_get_commits_respects_both_start_and_end(self): second_id = self.repo.commit_ids[1] third_id = self.repo.commit_ids[2] commits = self.repo.get_commits(start_id=second_id, end_id=third_id) assert isinstance(commits, CollectionGenerator) commits = list(commits) assert len(commits) == 2 def test_get_commits_on_empty_repo_raises_EmptyRepository_error(self): repo_path = get_new_dir(str(time.time())) repo = self.Backend(repo_path, create=True) with pytest.raises(EmptyRepositoryError): list(repo.get_commits(start_id='foobar')) def test_get_commits_respects_hidden(self): commits = self.repo.get_commits(show_hidden=True) assert isinstance(commits, CollectionGenerator) assert len(commits) == 5 def test_get_commits_includes_end_commit(self): second_id = self.repo.commit_ids[1] commits = self.repo.get_commits(end_id=second_id) assert isinstance(commits, CollectionGenerator) assert len(commits) == 2 commits = list(commits) assert commits[-1].raw_id == second_id def test_get_commits_respects_start_date(self): start_date = datetime.datetime(2010, 1, 2) commits = self.repo.get_commits(start_date=start_date) assert isinstance(commits, CollectionGenerator) # Should be 4 commits after 2010-01-02 00:00:00 assert len(commits) == 4 for c in commits: assert c.date >= start_date def test_get_commits_respects_start_date_with_branch(self): start_date = datetime.datetime(2010, 1, 2) commits = self.repo.get_commits( start_date=start_date, branch_name=self.repo.DEFAULT_BRANCH_NAME) assert isinstance(commits, CollectionGenerator) # Should be 4 commits after 2010-01-02 00:00:00 assert len(commits) == 4 for c in commits: assert c.date >= start_date def test_get_commits_respects_start_date_and_end_date(self): start_date = datetime.datetime(2010, 1, 2) end_date = datetime.datetime(2010, 1, 3) commits = self.repo.get_commits(start_date=start_date, end_date=end_date) assert isinstance(commits, CollectionGenerator) assert len(commits) == 2 for c in commits: assert c.date >= start_date assert c.date <= end_date def test_get_commits_respects_end_date(self): end_date = datetime.datetime(2010, 1, 2) commits = self.repo.get_commits(end_date=end_date) assert isinstance(commits, CollectionGenerator) assert len(commits) == 1 for c in commits: assert c.date <= end_date def test_get_commits_respects_reverse(self): commits = self.repo.get_commits() # no longer reverse support assert isinstance(commits, CollectionGenerator) assert len(commits) == 5 commit_ids = reversed([c.raw_id for c in commits]) assert list(commit_ids) == list(reversed(self.repo.commit_ids)) def test_get_commits_slice_generator(self): commits = self.repo.get_commits( branch_name=self.repo.DEFAULT_BRANCH_NAME) assert isinstance(commits, CollectionGenerator) commit_slice = list(commits[1:3]) assert len(commit_slice) == 2 def test_get_commits_raise_commitdoesnotexist_for_wrong_start(self): with pytest.raises(CommitDoesNotExistError): list(self.repo.get_commits(start_id='foobar')) def test_get_commits_raise_commitdoesnotexist_for_wrong_end(self): with pytest.raises(CommitDoesNotExistError): list(self.repo.get_commits(end_id='foobar')) def test_get_commits_raise_branchdoesnotexist_for_wrong_branch_name(self): with pytest.raises(BranchDoesNotExistError): list(self.repo.get_commits(branch_name='foobar')) def test_get_commits_raise_repositoryerror_for_wrong_start_end(self): start_id = self.repo.commit_ids[-1] end_id = self.repo.commit_ids[0] with pytest.raises(RepositoryError): list(self.repo.get_commits(start_id=start_id, end_id=end_id)) def test_get_commits_raises_for_numerical_ids(self): with pytest.raises(TypeError): self.repo.get_commits(start_id=1, end_id=2) def test_commit_equality(self): commit1 = self.repo.get_commit(self.repo.commit_ids[0]) commit2 = self.repo.get_commit(self.repo.commit_ids[1]) assert commit1 == commit1 assert commit2 == commit2 assert commit1 != commit2 assert commit2 != commit1 assert commit1 is not None assert commit2 is not None assert 1 != commit1 assert 'string' != commit1 @pytest.mark.parametrize("filename, expected", [ ("README.rst", False), ("README", True), ]) def test_commit_is_link(vcsbackend, filename, expected): commit = vcsbackend.repo.get_commit() link_status = commit.is_link(filename) assert link_status is expected @pytest.mark.usefixtures("vcs_repository_support") class TestCommitsChanges(BackendTestMixin): recreate_repo_per_test = False @classmethod def _get_commits(cls): return [ { 'message': 'Initial', 'author': 'Joe Doe ', 'date': datetime.datetime(2010, 1, 1, 20), 'added': [ FileNode(b'foo/bar', content=b'foo'), FileNode(safe_bytes('foo/bał'), content=b'foo'), FileNode(b'foobar', content=b'foo'), FileNode(b'qwe', content=b'foo'), ], }, { 'message': 'Massive changes', 'author': 'Joe Doe ', 'date': datetime.datetime(2010, 1, 1, 22), 'added': [FileNode(b'fallout', content=b'War never changes')], 'changed': [ FileNode(b'foo/bar', content=b'baz'), FileNode(b'foobar', content=b'baz'), ], 'removed': [FileNode(b'qwe')], }, ] def test_initial_commit(self, local_dt_to_utc): commit = self.repo.get_commit(commit_idx=0) assert set(commit.added) == { commit.get_node('foo/bar'), commit.get_node('foo/bał'), commit.get_node('foobar'), commit.get_node('qwe') } assert set(commit.changed) == set() assert set(commit.removed) == set() assert set(commit.affected_files) == {'foo/bar', 'foo/bał', 'foobar', 'qwe'} assert commit.date == local_dt_to_utc( datetime.datetime(2010, 1, 1, 20, 0)) def test_head_added(self): commit = self.repo.get_commit() assert isinstance(commit.added, AddedFileNodesGenerator) assert set(commit.added) == {commit.get_node('fallout')} assert isinstance(commit.changed, ChangedFileNodesGenerator) assert set(commit.changed) == {commit.get_node('foo/bar'), commit.get_node('foobar')} assert isinstance(commit.removed, RemovedFileNodesGenerator) assert len(commit.removed) == 1 assert list(commit.removed)[0].path == 'qwe' def test_get_filemode(self): commit = self.repo.get_commit() assert FILEMODE_DEFAULT == commit.get_file_mode('foo/bar') def test_get_filemode_non_ascii(self): commit = self.repo.get_commit() assert FILEMODE_DEFAULT == commit.get_file_mode('foo/bał') assert FILEMODE_DEFAULT == commit.get_file_mode('foo/bał') def test_get_path_history(self): commit = self.repo.get_commit() history = commit.get_path_history('foo/bar') assert len(history) == 2 def test_get_path_history_with_limit(self): commit = self.repo.get_commit() history = commit.get_path_history('foo/bar', limit=1) assert len(history) == 1 def test_get_path_history_first_commit(self): commit = self.repo[0] history = commit.get_path_history('foo/bar') assert len(history) == 1 def assert_text_equal(expected, given): assert expected == given assert isinstance(expected, str) assert isinstance(given, str)