# -*- coding: utf-8 -*- # Copyright (C) 2010-2020 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import mock import pytest import lxml.html from rhodecode.lib.vcs.exceptions import RepositoryRequirementError from rhodecode.tests import assert_session_flash from rhodecode.tests.utils import AssertResponse, commit_change def route_path(name, params=None, **kwargs): import urllib.request, urllib.parse, urllib.error base_url = { 'repo_compare_select': '/{repo_name}/compare', 'repo_compare': '/{repo_name}/compare/{source_ref_type}@{source_ref}...{target_ref_type}@{target_ref}', }[name].format(**kwargs) if params: base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params)) return base_url @pytest.mark.usefixtures("autologin_user", "app") class TestCompareView(object): def test_compare_index_is_reached_at_least_once(self, backend): repo = backend.repo self.app.get( route_path('repo_compare_select', repo_name=repo.repo_name)) @pytest.mark.xfail_backends("svn", reason="Requires pull") def test_compare_remote_with_different_commit_indexes(self, backend): # Preparing the following repository structure: # # Origin repository has two commits: # # 0 1 # A -- D # # The fork of it has a few more commits and "D" has a commit index # which does not exist in origin. # # 0 1 2 3 4 # A -- -- -- D -- E # \- B -- C # fork = backend.create_repo() # prepare fork commit0 = commit_change( fork.repo_name, filename='file1', content='A', message='A', vcs_type=backend.alias, parent=None, newfile=True) commit1 = commit_change( fork.repo_name, filename='file1', content='B', message='B, child of A', vcs_type=backend.alias, parent=commit0) commit_change( # commit 2 fork.repo_name, filename='file1', content='C', message='C, child of B', vcs_type=backend.alias, parent=commit1) commit3 = commit_change( fork.repo_name, filename='file1', content='D', message='D, child of A', vcs_type=backend.alias, parent=commit0) commit4 = commit_change( fork.repo_name, filename='file1', content='E', message='E, child of D', vcs_type=backend.alias, parent=commit3) # prepare origin repository, taking just the history up to D origin = backend.create_repo() origin_repo = origin.scm_instance(cache=False) origin_repo.config.clear_section('hooks') origin_repo.pull(fork.repo_full_path, commit_ids=[commit3.raw_id]) origin_repo = origin.scm_instance(cache=False) # cache rebuild # Verify test fixture setup # This does not work for git if backend.alias != 'git': assert 5 == len(fork.scm_instance().commit_ids) assert 2 == len(origin_repo.commit_ids) # Comparing the revisions response = self.app.get( route_path('repo_compare', repo_name=origin.repo_name, source_ref_type="rev", source_ref=commit3.raw_id, target_ref_type="rev", target_ref=commit4.raw_id, params=dict(merge='1', target_repo=fork.repo_name) )) compare_page = ComparePage(response) compare_page.contains_commits([commit4]) @pytest.mark.xfail_backends("svn", reason="Depends on branch support") def test_compare_forks_on_branch_extra_commits(self, backend): repo1 = backend.create_repo() # commit something ! commit0 = commit_change( repo1.repo_name, filename='file1', content='line1\n', message='commit1', vcs_type=backend.alias, parent=None, newfile=True) # fork this repo repo2 = backend.create_fork() # add two extra commit into fork commit1 = commit_change( repo2.repo_name, filename='file1', content='line1\nline2\n', message='commit2', vcs_type=backend.alias, parent=commit0) commit2 = commit_change( repo2.repo_name, filename='file1', content='line1\nline2\nline3\n', message='commit3', vcs_type=backend.alias, parent=commit1) commit_id1 = repo1.scm_instance().DEFAULT_BRANCH_NAME commit_id2 = repo2.scm_instance().DEFAULT_BRANCH_NAME response = self.app.get( route_path('repo_compare', repo_name=repo1.repo_name, source_ref_type="branch", source_ref=commit_id2, target_ref_type="branch", target_ref=commit_id1, params=dict(merge='1', target_repo=repo2.repo_name) )) response.mustcontain('%s@%s' % (repo1.repo_name, commit_id2)) response.mustcontain('%s@%s' % (repo2.repo_name, commit_id1)) compare_page = ComparePage(response) compare_page.contains_change_summary(1, 2, 0) compare_page.contains_commits([commit1, commit2]) anchor = 'a_c-{}-826e8142e6ba'.format(commit0.short_id) compare_page.contains_file_links_and_anchors([('file1', anchor), ]) # Swap is removed when comparing branches since it's a PR feature and # it is then a preview mode compare_page.swap_is_hidden() compare_page.target_source_are_disabled() @pytest.mark.xfail_backends("svn", reason="Depends on branch support") def test_compare_forks_on_branch_extra_commits_origin_has_incomming(self, backend): repo1 = backend.create_repo() # commit something ! commit0 = commit_change( repo1.repo_name, filename='file1', content='line1\n', message='commit1', vcs_type=backend.alias, parent=None, newfile=True) # fork this repo repo2 = backend.create_fork() # now commit something to origin repo commit_change( repo1.repo_name, filename='file2', content='line1file2\n', message='commit2', vcs_type=backend.alias, parent=commit0, newfile=True) # add two extra commit into fork commit1 = commit_change( repo2.repo_name, filename='file1', content='line1\nline2\n', message='commit2', vcs_type=backend.alias, parent=commit0) commit2 = commit_change( repo2.repo_name, filename='file1', content='line1\nline2\nline3\n', message='commit3', vcs_type=backend.alias, parent=commit1) commit_id1 = repo1.scm_instance().DEFAULT_BRANCH_NAME commit_id2 = repo2.scm_instance().DEFAULT_BRANCH_NAME response = self.app.get( route_path('repo_compare', repo_name=repo1.repo_name, source_ref_type="branch", source_ref=commit_id2, target_ref_type="branch", target_ref=commit_id1, params=dict(merge='1', target_repo=repo2.repo_name), )) response.mustcontain('%s@%s' % (repo1.repo_name, commit_id2)) response.mustcontain('%s@%s' % (repo2.repo_name, commit_id1)) compare_page = ComparePage(response) compare_page.contains_change_summary(1, 2, 0) compare_page.contains_commits([commit1, commit2]) anchor = 'a_c-{}-826e8142e6ba'.format(commit0.short_id) compare_page.contains_file_links_and_anchors([('file1', anchor), ]) # Swap is removed when comparing branches since it's a PR feature and # it is then a preview mode compare_page.swap_is_hidden() compare_page.target_source_are_disabled() @pytest.mark.xfail_backends("svn") # TODO(marcink): no svn support for compare two seperate repos def test_compare_of_unrelated_forks(self, backend): orig = backend.create_repo(number_of_commits=1) fork = backend.create_repo(number_of_commits=1) response = self.app.get( route_path('repo_compare', repo_name=orig.repo_name, source_ref_type="rev", source_ref="tip", target_ref_type="rev", target_ref="tip", params=dict(merge='1', target_repo=fork.repo_name), ), status=302) response = response.follow() response.mustcontain("Repositories unrelated.") @pytest.mark.xfail_backends("svn") def test_compare_cherry_pick_commits_from_bottom(self, backend): # repo1: # commit0: # commit1: # repo1-fork- in which we will cherry pick bottom commits # commit0: # commit1: # commit2: x # commit3: x # commit4: x # commit5: # make repo1, and commit1+commit2 repo1 = backend.create_repo() # commit something ! commit0 = commit_change( repo1.repo_name, filename='file1', content='line1\n', message='commit1', vcs_type=backend.alias, parent=None, newfile=True) commit1 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\n', message='commit2', vcs_type=backend.alias, parent=commit0) # fork this repo repo2 = backend.create_fork() # now make commit3-6 commit2 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\n', message='commit3', vcs_type=backend.alias, parent=commit1) commit3 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\n', message='commit4', vcs_type=backend.alias, parent=commit2) commit4 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\n', message='commit5', vcs_type=backend.alias, parent=commit3) commit_change( # commit 5 repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\nline6\n', message='commit6', vcs_type=backend.alias, parent=commit4) response = self.app.get( route_path('repo_compare', repo_name=repo2.repo_name, # parent of commit2, in target repo2 source_ref_type="rev", source_ref=commit1.raw_id, target_ref_type="rev", target_ref=commit4.raw_id, params=dict(merge='1', target_repo=repo1.repo_name), )) response.mustcontain('%s@%s' % (repo2.repo_name, commit1.short_id)) response.mustcontain('%s@%s' % (repo1.repo_name, commit4.short_id)) # files compare_page = ComparePage(response) compare_page.contains_change_summary(1, 3, 0) compare_page.contains_commits([commit2, commit3, commit4]) anchor = 'a_c-{}-826e8142e6ba'.format(commit1.short_id) compare_page.contains_file_links_and_anchors([('file1', anchor),]) @pytest.mark.xfail_backends("svn") def test_compare_cherry_pick_commits_from_top(self, backend): # repo1: # commit0: # commit1: # repo1-fork- in which we will cherry pick bottom commits # commit0: # commit1: # commit2: # commit3: x # commit4: x # commit5: x # make repo1, and commit1+commit2 repo1 = backend.create_repo() # commit something ! commit0 = commit_change( repo1.repo_name, filename='file1', content='line1\n', message='commit1', vcs_type=backend.alias, parent=None, newfile=True) commit1 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\n', message='commit2', vcs_type=backend.alias, parent=commit0) # fork this repo backend.create_fork() # now make commit3-6 commit2 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\n', message='commit3', vcs_type=backend.alias, parent=commit1) commit3 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\n', message='commit4', vcs_type=backend.alias, parent=commit2) commit4 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\n', message='commit5', vcs_type=backend.alias, parent=commit3) commit5 = commit_change( repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\nline6\n', message='commit6', vcs_type=backend.alias, parent=commit4) response = self.app.get( route_path('repo_compare', repo_name=repo1.repo_name, # parent of commit3, not in source repo2 source_ref_type="rev", source_ref=commit2.raw_id, target_ref_type="rev", target_ref=commit5.raw_id, params=dict(merge='1'),)) response.mustcontain('%s@%s' % (repo1.repo_name, commit2.short_id)) response.mustcontain('%s@%s' % (repo1.repo_name, commit5.short_id)) compare_page = ComparePage(response) compare_page.contains_change_summary(1, 3, 0) compare_page.contains_commits([commit3, commit4, commit5]) # files anchor = 'a_c-{}-826e8142e6ba'.format(commit2.short_id) compare_page.contains_file_links_and_anchors([('file1', anchor),]) @pytest.mark.xfail_backends("svn") def test_compare_remote_branches(self, backend): repo1 = backend.repo repo2 = backend.create_fork() commit_id1 = repo1.get_commit(commit_idx=3).raw_id commit_id1_short = repo1.get_commit(commit_idx=3).short_id commit_id2 = repo1.get_commit(commit_idx=6).raw_id commit_id2_short = repo1.get_commit(commit_idx=6).short_id response = self.app.get( route_path('repo_compare', repo_name=repo1.repo_name, source_ref_type="rev", source_ref=commit_id1, target_ref_type="rev", target_ref=commit_id2, params=dict(merge='1', target_repo=repo2.repo_name), )) response.mustcontain('%s@%s' % (repo1.repo_name, commit_id1)) response.mustcontain('%s@%s' % (repo2.repo_name, commit_id2)) compare_page = ComparePage(response) # outgoing commits between those commits compare_page.contains_commits( [repo2.get_commit(commit_idx=x) for x in [4, 5, 6]]) # files compare_page.contains_file_links_and_anchors([ ('vcs/backends/hg.py', 'a_c-{}-9c390eb52cd6'.format(commit_id2_short)), ('vcs/backends/__init__.py', 'a_c-{}-41b41c1f2796'.format(commit_id1_short)), ('vcs/backends/base.py', 'a_c-{}-2f574d260608'.format(commit_id1_short)), ]) @pytest.mark.xfail_backends("svn") def test_source_repo_new_commits_after_forking_simple_diff(self, backend): repo1 = backend.create_repo() r1_name = repo1.repo_name commit0 = commit_change( repo=r1_name, filename='file1', content='line1', message='commit1', vcs_type=backend.alias, newfile=True) assert repo1.scm_instance().commit_ids == [commit0.raw_id] # fork the repo1 repo2 = backend.create_fork() assert repo2.scm_instance().commit_ids == [commit0.raw_id] self.r2_id = repo2.repo_id r2_name = repo2.repo_name commit1 = commit_change( repo=r2_name, filename='file1-fork', content='file1-line1-from-fork', message='commit1-fork', vcs_type=backend.alias, parent=repo2.scm_instance()[-1], newfile=True) commit2 = commit_change( repo=r2_name, filename='file2-fork', content='file2-line1-from-fork', message='commit2-fork', vcs_type=backend.alias, parent=commit1, newfile=True) commit_change( # commit 3 repo=r2_name, filename='file3-fork', content='file3-line1-from-fork', message='commit3-fork', vcs_type=backend.alias, parent=commit2, newfile=True) # compare ! commit_id1 = repo1.scm_instance().DEFAULT_BRANCH_NAME commit_id2 = repo2.scm_instance().DEFAULT_BRANCH_NAME response = self.app.get( route_path('repo_compare', repo_name=r2_name, source_ref_type="branch", source_ref=commit_id1, target_ref_type="branch", target_ref=commit_id2, params=dict(merge='1', target_repo=r1_name), )) response.mustcontain('%s@%s' % (r2_name, commit_id1)) response.mustcontain('%s@%s' % (r1_name, commit_id2)) response.mustcontain('No files') response.mustcontain('No commits in this compare') commit0 = commit_change( repo=r1_name, filename='file2', content='line1-added-after-fork', message='commit2-parent', vcs_type=backend.alias, parent=None, newfile=True) # compare ! response = self.app.get( route_path('repo_compare', repo_name=r2_name, source_ref_type="branch", source_ref=commit_id1, target_ref_type="branch", target_ref=commit_id2, params=dict(merge='1', target_repo=r1_name), )) response.mustcontain('%s@%s' % (r2_name, commit_id1)) response.mustcontain('%s@%s' % (r1_name, commit_id2)) response.mustcontain("""commit2-parent""") response.mustcontain("""line1-added-after-fork""") compare_page = ComparePage(response) compare_page.contains_change_summary(1, 1, 0) @pytest.mark.xfail_backends("svn") def test_compare_commits(self, backend, xhr_header): commit0 = backend.repo.get_commit(commit_idx=0) commit1 = backend.repo.get_commit(commit_idx=1) response = self.app.get( route_path('repo_compare', repo_name=backend.repo_name, source_ref_type="rev", source_ref=commit0.raw_id, target_ref_type="rev", target_ref=commit1.raw_id, params=dict(merge='1') ), extra_environ=xhr_header, ) # outgoing commits between those commits compare_page = ComparePage(response) compare_page.contains_commits(commits=[commit1]) def test_errors_when_comparing_unknown_source_repo(self, backend): repo = backend.repo badrepo = 'badrepo' response = self.app.get( route_path('repo_compare', repo_name=badrepo, source_ref_type="rev", source_ref='tip', target_ref_type="rev", target_ref='tip', params=dict(merge='1', target_repo=repo.repo_name) ), status=404) def test_errors_when_comparing_unknown_target_repo(self, backend): repo = backend.repo badrepo = 'badrepo' response = self.app.get( route_path('repo_compare', repo_name=repo.repo_name, source_ref_type="rev", source_ref='tip', target_ref_type="rev", target_ref='tip', params=dict(merge='1', target_repo=badrepo), ), status=302) redirected = response.follow() redirected.mustcontain( 'Could not find the target repo: `{}`'.format(badrepo)) def test_compare_not_in_preview_mode(self, backend_stub): commit0 = backend_stub.repo.get_commit(commit_idx=0) commit1 = backend_stub.repo.get_commit(commit_idx=1) response = self.app.get( route_path('repo_compare', repo_name=backend_stub.repo_name, source_ref_type="rev", source_ref=commit0.raw_id, target_ref_type="rev", target_ref=commit1.raw_id, )) # outgoing commits between those commits compare_page = ComparePage(response) compare_page.swap_is_visible() compare_page.target_source_are_enabled() def test_compare_of_fork_with_largefiles(self, backend_hg, settings_util): orig = backend_hg.create_repo(number_of_commits=1) fork = backend_hg.create_fork() settings_util.create_repo_rhodecode_ui( orig, 'extensions', value='', key='largefiles', active=False) settings_util.create_repo_rhodecode_ui( fork, 'extensions', value='', key='largefiles', active=True) compare_module = ('rhodecode.lib.vcs.backends.hg.repository.' 'MercurialRepository.compare') with mock.patch(compare_module) as compare_mock: compare_mock.side_effect = RepositoryRequirementError() response = self.app.get( route_path('repo_compare', repo_name=orig.repo_name, source_ref_type="rev", source_ref="tip", target_ref_type="rev", target_ref="tip", params=dict(merge='1', target_repo=fork.repo_name), ), status=302) assert_session_flash( response, 'Could not compare repos with different large file settings') @pytest.mark.usefixtures("autologin_user") class TestCompareControllerSvn(object): def test_supports_references_with_path(self, app, backend_svn): repo = backend_svn['svn-simple-layout'] commit_id = repo.get_commit(commit_idx=-1).raw_id response = app.get( route_path('repo_compare', repo_name=repo.repo_name, source_ref_type="tag", source_ref="%s@%s" % ('tags/v0.1', commit_id), target_ref_type="tag", target_ref="%s@%s" % ('tags/v0.2', commit_id), params=dict(merge='1'), ), status=200) # Expecting no commits, since both paths are at the same revision response.mustcontain('No commits in this compare') # Should find only one file changed when comparing those two tags response.mustcontain('example.py') compare_page = ComparePage(response) compare_page.contains_change_summary(1, 5, 1) def test_shows_commits_if_different_ids(self, app, backend_svn): repo = backend_svn['svn-simple-layout'] source_id = repo.get_commit(commit_idx=-6).raw_id target_id = repo.get_commit(commit_idx=-1).raw_id response = app.get( route_path('repo_compare', repo_name=repo.repo_name, source_ref_type="tag", source_ref="%s@%s" % ('tags/v0.1', source_id), target_ref_type="tag", target_ref="%s@%s" % ('tags/v0.2', target_id), params=dict(merge='1') ), status=200) # It should show commits assert 'No commits in this compare' not in response.text # Should find only one file changed when comparing those two tags response.mustcontain('example.py') compare_page = ComparePage(response) compare_page.contains_change_summary(1, 5, 1) class ComparePage(AssertResponse): """ Abstracts the page template from the tests """ def contains_file_links_and_anchors(self, files): doc = lxml.html.fromstring(self.response.body) for filename, file_id in files: self.contains_one_anchor(file_id) diffblock = doc.cssselect('[data-f-path="%s"]' % filename) assert len(diffblock) == 2 for lnk in diffblock[0].cssselect('a'): if 'permalink' in lnk.text: assert '#{}'.format(file_id) in lnk.attrib['href'] break else: pytest.fail('Unable to find permalink') def contains_change_summary(self, files_changed, inserted, deleted): template = ( '{files_changed} file{plural} changed: ' '{inserted} inserted, {deleted} deleted') self.response.mustcontain(template.format( files_changed=files_changed, plural="s" if files_changed > 1 else "", inserted=inserted, deleted=deleted)) def contains_commits(self, commits, ancestors=None): response = self.response for commit in commits: # Expecting to see the commit message in an element which # has the ID "c-{commit.raw_id}" self.element_contains('#c-' + commit.raw_id, commit.message) self.contains_one_link( 'r%s:%s' % (commit.idx, commit.short_id), self._commit_url(commit)) if ancestors: response.mustcontain('Ancestor') for ancestor in ancestors: self.contains_one_link( ancestor.short_id, self._commit_url(ancestor)) def _commit_url(self, commit): return '/%s/changeset/%s' % (commit.repository.name, commit.raw_id) def swap_is_hidden(self): assert '