# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging import os import mock import pytest import rhodecode from rhodecode.lib import helpers as h from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason, Reference from rhodecode.lib.vcs.nodes import FileNode from rhodecode.lib.ext_json import json from rhodecode.model.changeset_status import ChangesetStatusModel from rhodecode.model.db import ( PullRequest, ChangesetStatus, UserLog, Notification, ChangesetComment, Repository, ) from rhodecode.model.meta import Session from rhodecode.model.pull_request import PullRequestModel from rhodecode.model.user import UserModel from rhodecode.model.comment import CommentsModel from rhodecode.tests import ( assert_session_flash, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, ) from rhodecode.tests.fixtures.fixture_utils import PRTestUtility from rhodecode.tests.routes import route_path @pytest.mark.usefixtures("app", "autologin_user") @pytest.mark.backends("git", "hg") class TestPullrequestsView(object): def test_index(self, backend): self.app.get(route_path("pullrequest_new", repo_name=backend.repo_name)) def test_option_menu_create_pull_request_exists(self, backend): repo_name = backend.repo_name response = self.app.get(h.route_path("repo_summary", repo_name=repo_name)) create_pr_link = 'Create Pull Request' % route_path( "pullrequest_new", repo_name=repo_name ) response.mustcontain(create_pr_link) def test_create_pr_form_with_raw_commit_id(self, backend): repo = backend.repo self.app.get( route_path( "pullrequest_new", repo_name=repo.repo_name, commit=repo.get_commit().raw_id, ), status=200, ) @pytest.mark.parametrize("pr_merge_enabled", [True, False]) @pytest.mark.parametrize("range_diff", ["0", "1"]) def test_show(self, pr_util, pr_merge_enabled, range_diff): pull_request = pr_util.create_pull_request( mergeable=pr_merge_enabled, enable_notifications=False ) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, params={"range-diff": range_diff}, ) ) for commit_id in pull_request.revisions: response.mustcontain(commit_id) response.mustcontain(pull_request.target_ref_parts.type) response.mustcontain(pull_request.target_ref_parts.name) response.mustcontain('class="pull-request-merge"') if pr_merge_enabled: response.mustcontain("Pull request reviewer approval is pending") else: response.mustcontain("Server-side pull request merging is disabled.") if range_diff == "1": response.mustcontain("Turn off: Show the diff as commit range") def test_show_versions_of_pr(self, backend, csrf_token): commits = [ { "message": "initial-commit", "added": [FileNode(b"test-file.txt", b"LINE1\n")], }, { "message": "commit-1", "changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\n")], }, # Above is the initial version of PR that changes a single line # from now on we'll add 3x commit adding a nother line on each step { "message": "commit-2", "changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\n")], }, { "message": "commit-3", "changed": [ FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\n") ], }, { "message": "commit-4", "changed": [ FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\nLINE5\n") ], }, ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["initial-commit"]) source = backend.create_repo(heads=["commit-1"]) source_repo_name = source.repo_name target_repo_name = target.repo_name target_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["initial-commit"] ) source_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["commit-1"] ) response = self.app.post( route_path("pullrequest_create", repo_name=source.repo_name), [ ("source_repo", source_repo_name), ("source_ref", source_ref), ("target_repo", target_repo_name), ("target_ref", target_ref), ("common_ancestor", commit_ids["initial-commit"]), ("pullrequest_title", "Title"), ("pullrequest_desc", "Description"), ("description_renderer", "markdown"), ("__start__", "review_members:sequence"), ("__start__", "reviewer:mapping"), ("user_id", "1"), ("__start__", "reasons:sequence"), ("reason", "Some reason"), ("__end__", "reasons:sequence"), ("__start__", "rules:sequence"), ("__end__", "rules:sequence"), ("mandatory", "False"), ("__end__", "reviewer:mapping"), ("__end__", "review_members:sequence"), ("__start__", "revisions:sequence"), ("revisions", commit_ids["commit-1"]), ("__end__", "revisions:sequence"), ("user", ""), ("csrf_token", csrf_token), ], status=302, ) location = response.headers["Location"] pull_request_id = location.rsplit("/", 1)[1] assert pull_request_id != "new" pull_request = PullRequest.get(int(pull_request_id)) pull_request_id = pull_request.pull_request_id # Show initial version of PR response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, ) ) response.mustcontain("commit-1") response.mustcontain(no=["commit-2"]) response.mustcontain(no=["commit-3"]) response.mustcontain(no=["commit-4"]) response.mustcontain('cb-addition">LINE2') response.mustcontain(no=["LINE3"]) response.mustcontain(no=["LINE4"]) response.mustcontain(no=["LINE5"]) # update PR #1 source_repo = Repository.get_by_repo_name(source_repo_name) backend.pull_heads(source_repo, heads=["commit-2"]) response = self.app.post( route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ), params={"update_commits": "true", "csrf_token": csrf_token}, ) # update PR #2 source_repo = Repository.get_by_repo_name(source_repo_name) backend.pull_heads(source_repo, heads=["commit-3"]) response = self.app.post( route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ), params={"update_commits": "true", "csrf_token": csrf_token}, ) # update PR #3 source_repo = Repository.get_by_repo_name(source_repo_name) backend.pull_heads(source_repo, heads=["commit-4"]) response = self.app.post( route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ), params={"update_commits": "true", "csrf_token": csrf_token}, ) # Show final version ! response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, ) ) # 3 updates, and the latest == 4 response.mustcontain("4 versions available for this pull request") response.mustcontain(no=["rhodecode diff rendering error"]) # initial show must have 3 commits, and 3 adds response.mustcontain("commit-1") response.mustcontain("commit-2") response.mustcontain("commit-3") response.mustcontain("commit-4") response.mustcontain('cb-addition">LINE2') response.mustcontain('cb-addition">LINE3') response.mustcontain('cb-addition">LINE4') response.mustcontain('cb-addition">LINE5') # fetch versions pr = PullRequest.get(pull_request_id) versions = [x.pull_request_version_id for x in pr.versions.all()] assert len(versions) == 3 # show v1,v2,v3,v4 def cb_line(text): return 'cb-addition">{}'.format(text) def cb_context(text): return ( '' "{}".format(text) ) commit_tests = { # in response, not in response 1: (["commit-1"], ["commit-2", "commit-3", "commit-4"]), 2: (["commit-1", "commit-2"], ["commit-3", "commit-4"]), 3: (["commit-1", "commit-2", "commit-3"], ["commit-4"]), 4: (["commit-1", "commit-2", "commit-3", "commit-4"], []), } diff_tests = { 1: (["LINE2"], ["LINE3", "LINE4", "LINE5"]), 2: (["LINE2", "LINE3"], ["LINE4", "LINE5"]), 3: (["LINE2", "LINE3", "LINE4"], ["LINE5"]), 4: (["LINE2", "LINE3", "LINE4", "LINE5"], []), } for idx, ver in enumerate(versions, 1): response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, params={"version": ver}, ) ) response.mustcontain(no=["rhodecode diff rendering error"]) response.mustcontain("Showing changes at v{}".format(idx)) yes, no = commit_tests[idx] for y in yes: response.mustcontain(y) for n in no: response.mustcontain(no=n) yes, no = diff_tests[idx] for y in yes: response.mustcontain(cb_line(y)) for n in no: response.mustcontain(no=n) # show diff between versions diff_compare_tests = { 1: (["LINE3"], ["LINE1", "LINE2"]), 2: (["LINE3", "LINE4"], ["LINE1", "LINE2"]), 3: (["LINE3", "LINE4", "LINE5"], ["LINE1", "LINE2"]), } for idx, ver in enumerate(versions, 1): adds, context = diff_compare_tests[idx] to_ver = ver + 1 if idx == 3: to_ver = "latest" response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, params={"from_version": versions[0], "version": to_ver}, ) ) response.mustcontain(no=["rhodecode diff rendering error"]) for a in adds: response.mustcontain(cb_line(a)) for c in context: response.mustcontain(cb_context(c)) # test version v2 -> v3 response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, params={"from_version": versions[1], "version": versions[2]}, ) ) response.mustcontain(cb_context("LINE1")) response.mustcontain(cb_context("LINE2")) response.mustcontain(cb_context("LINE3")) response.mustcontain(cb_line("LINE4")) def test_close_status_visibility(self, pr_util, user_util, csrf_token): # Logout response = self.app.post( h.route_path("logout"), params={"csrf_token": csrf_token} ) # Login as regular user response = self.app.post( h.route_path("login"), {"username": TEST_USER_REGULAR_LOGIN, "password": "test12"}, ) pull_request = pr_util.create_pull_request(author=TEST_USER_REGULAR_LOGIN) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) response.mustcontain("Server-side pull request merging is disabled.") assert_response = response.assert_response() # for regular user without a merge permissions, we don't see it assert_response.no_element_exists("#close-pull-request-action") user_util.grant_user_permission_to_repo( pull_request.target_repo, UserModel().get_by_username(TEST_USER_REGULAR_LOGIN), "repository.write", ) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) response.mustcontain("Server-side pull request merging is disabled.") assert_response = response.assert_response() # now regular user has a merge permissions, we have CLOSE button assert_response.one_element_exists("#close-pull-request-action") def test_show_invalid_commit_id(self, pr_util): # Simulating invalid revisions which will cause a lookup error pull_request = pr_util.create_pull_request() pull_request.revisions = ["invalid"] Session().add(pull_request) Session().commit() response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) for commit_id in pull_request.revisions: response.mustcontain(commit_id) def test_show_invalid_source_reference(self, pr_util): pull_request = pr_util.create_pull_request() pull_request.source_ref = "branch:b:invalid" Session().add(pull_request) Session().commit() self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) def test_edit_title_description(self, pr_util, csrf_token): pull_request = pr_util.create_pull_request() pull_request_id = pull_request.pull_request_id response = self.app.post( route_path( "pullrequest_update", repo_name=pull_request.target_repo.repo_name, pull_request_id=pull_request_id, ), params={ "edit_pull_request": "true", "title": "New title", "description": "New description", "csrf_token": csrf_token, }, ) assert_session_flash( response, "Pull request title & description updated.", category="success" ) pull_request = PullRequest.get(pull_request_id) assert pull_request.title == "New title" assert pull_request.description == "New description" def test_edit_title_description_special(self, pr_util, csrf_token): pull_request = pr_util.create_pull_request() pull_request_id = pull_request.pull_request_id response = self.app.post( route_path( "pullrequest_update", repo_name=pull_request.target_repo.repo_name, pull_request_id=pull_request_id, ), params={ "edit_pull_request": "true", "title": "New title {} {2} {foo}", "description": "New description", "csrf_token": csrf_token, }, ) assert_session_flash( response, "Pull request title & description updated.", category="success" ) pull_request = PullRequest.get(pull_request_id) assert pull_request.title_safe == "New title {{}} {{2}} {{foo}}" def test_edit_title_description_closed(self, pr_util, csrf_token): pull_request = pr_util.create_pull_request() pull_request_id = pull_request.pull_request_id repo_name = pull_request.target_repo.repo_name pr_util.close() response = self.app.post( route_path( "pullrequest_update", repo_name=repo_name, pull_request_id=pull_request_id, ), params={ "edit_pull_request": "true", "title": "New title", "description": "New description", "csrf_token": csrf_token, }, status=200, ) assert_session_flash( response, "Cannot update closed pull requests.", category="error" ) def test_update_invalid_source_reference(self, pr_util, csrf_token): from rhodecode.lib.vcs.backends.base import UpdateFailureReason pull_request = pr_util.create_pull_request() pull_request.source_ref = "branch:invalid-branch:invalid-commit-id" Session().add(pull_request) Session().commit() pull_request_id = pull_request.pull_request_id response = self.app.post( route_path( "pullrequest_update", repo_name=pull_request.target_repo.repo_name, pull_request_id=pull_request_id, ), params={"update_commits": "true", "csrf_token": csrf_token}, ) expected_msg = str( PullRequestModel.UPDATE_STATUS_MESSAGES[ UpdateFailureReason.MISSING_SOURCE_REF ] ) assert_session_flash(response, expected_msg, category="error") def test_missing_target_reference(self, pr_util, csrf_token): from rhodecode.lib.vcs.backends.base import MergeFailureReason pull_request = pr_util.create_pull_request(approved=True, mergeable=True) unicode_reference = "branch:invalid-branch:invalid-commit-id" pull_request.target_ref = unicode_reference Session().add(pull_request) Session().commit() pull_request_id = pull_request.pull_request_id pull_request_url = route_path( "pullrequest_show", repo_name=pull_request.target_repo.repo_name, pull_request_id=pull_request_id, ) response = self.app.get(pull_request_url) # target_ref_id = "invalid-branch" merge_resp = MergeResponse( True, True, Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"), MergeFailureReason.MISSING_TARGET_REF, metadata={ "target_ref": PullRequest.unicode_to_reference(unicode_reference) }, ) response.assert_response().element_contains( 'div[data-role="merge-message"]', merge_resp.merge_status_message ) def test_comment_and_close_pull_request_custom_message_approved( self, pr_util, csrf_token, xhr_header ): pull_request = pr_util.create_pull_request(approved=True) pull_request_id = pull_request.pull_request_id author = pull_request.user_id repo = pull_request.target_repo.repo_id self.app.post( route_path( "pullrequest_comment_create", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request_id, ), params={ "close_pull_request": "1", "text": "Closing a PR", "csrf_token": csrf_token, }, extra_environ=xhr_header, ) journal = ( UserLog.query() .filter(UserLog.user_id == author) .filter(UserLog.repository_id == repo) .order_by(UserLog.user_log_id.asc()) .all() ) assert journal[-1].action == "repo.pull_request.close" pull_request = PullRequest.get(pull_request_id) assert pull_request.is_closed() status = ChangesetStatusModel().get_status( pull_request.source_repo, pull_request=pull_request ) assert status == ChangesetStatus.STATUS_APPROVED comments = ( ChangesetComment() .query() .filter(ChangesetComment.pull_request == pull_request) .order_by(ChangesetComment.comment_id.asc()) .all() ) assert comments[-1].text == "Closing a PR" def test_comment_force_close_pull_request_rejected( self, pr_util, csrf_token, xhr_header ): pull_request = pr_util.create_pull_request() pull_request_id = pull_request.pull_request_id PullRequestModel().update_reviewers( pull_request_id, [ (1, ["reason"], False, "reviewer", []), (2, ["reason2"], False, "reviewer", []), ], pull_request.author, ) author = pull_request.user_id repo = pull_request.target_repo.repo_id self.app.post( route_path( "pullrequest_comment_create", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request_id, ), params={"close_pull_request": "1", "csrf_token": csrf_token}, extra_environ=xhr_header, ) pull_request = PullRequest.get(pull_request_id) journal = ( UserLog.query() .filter(UserLog.user_id == author, UserLog.repository_id == repo) .order_by(UserLog.user_log_id.asc()) .all() ) assert journal[-1].action == "repo.pull_request.close" # check only the latest status, not the review status status = ChangesetStatusModel().get_status( pull_request.source_repo, pull_request=pull_request ) assert status == ChangesetStatus.STATUS_REJECTED def test_comment_and_close_pull_request(self, pr_util, csrf_token, xhr_header): pull_request = pr_util.create_pull_request() pull_request_id = pull_request.pull_request_id response = self.app.post( route_path( "pullrequest_comment_create", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ), params={"close_pull_request": "true", "csrf_token": csrf_token}, extra_environ=xhr_header, ) assert response.json pull_request = PullRequest.get(pull_request_id) assert pull_request.is_closed() # check only the latest status, not the review status status = ChangesetStatusModel().get_status( pull_request.source_repo, pull_request=pull_request ) assert status == ChangesetStatus.STATUS_REJECTED def test_comment_and_close_pull_request_try_edit_comment( self, pr_util, csrf_token, xhr_header ): pull_request = pr_util.create_pull_request() pull_request_id = pull_request.pull_request_id target_scm = pull_request.target_repo.scm_instance() target_scm_name = target_scm.name response = self.app.post( route_path( "pullrequest_comment_create", repo_name=target_scm_name, pull_request_id=pull_request_id, ), params={ "close_pull_request": "true", "csrf_token": csrf_token, }, extra_environ=xhr_header, ) assert response.json pull_request = PullRequest.get(pull_request_id) target_scm = pull_request.target_repo.scm_instance() target_scm_name = target_scm.name assert pull_request.is_closed() # check only the latest status, not the review status status = ChangesetStatusModel().get_status( pull_request.source_repo, pull_request=pull_request ) assert status == ChangesetStatus.STATUS_REJECTED for comment_id in response.json.keys(): test_text = "test" response = self.app.post( route_path( "pullrequest_comment_edit", repo_name=target_scm_name, pull_request_id=pull_request_id, comment_id=comment_id, ), extra_environ=xhr_header, params={ "csrf_token": csrf_token, "text": test_text, }, status=403, ) assert response.status_int == 403 def test_comment_and_comment_edit(self, pr_util, csrf_token, xhr_header): pull_request = pr_util.create_pull_request() target_scm = pull_request.target_repo.scm_instance() target_scm_name = target_scm.name response = self.app.post( route_path( "pullrequest_comment_create", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, ), params={ "csrf_token": csrf_token, "text": "init", }, extra_environ=xhr_header, ) assert response.json for comment_id in response.json.keys(): assert comment_id test_text = "test" self.app.post( route_path( "pullrequest_comment_edit", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, comment_id=comment_id, ), extra_environ=xhr_header, params={ "csrf_token": csrf_token, "text": test_text, "version": "0", }, ) text_form_db = ( ChangesetComment.query() .filter(ChangesetComment.comment_id == comment_id) .first() .text ) assert test_text == text_form_db def test_comment_and_comment_edit_special(self, pr_util, csrf_token, xhr_header): pull_request = pr_util.create_pull_request() target_scm = pull_request.target_repo.scm_instance() target_scm_name = target_scm.name response = self.app.post( route_path( "pullrequest_comment_create", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, ), params={ "csrf_token": csrf_token, "text": "init", }, extra_environ=xhr_header, ) assert response.json for comment_id in response.json.keys(): test_text = "init" response = self.app.post( route_path( "pullrequest_comment_edit", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, comment_id=comment_id, ), extra_environ=xhr_header, params={ "csrf_token": csrf_token, "text": test_text, "version": "0", }, status=404, ) assert response.status_int == 404 def test_comment_and_try_edit_already_edited(self, pr_util, csrf_token, xhr_header): pull_request = pr_util.create_pull_request() target_scm = pull_request.target_repo.scm_instance() target_scm_name = target_scm.name response = self.app.post( route_path( "pullrequest_comment_create", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, ), params={ "csrf_token": csrf_token, "text": "init", }, extra_environ=xhr_header, ) assert response.json for comment_id in response.json.keys(): test_text = "test" self.app.post( route_path( "pullrequest_comment_edit", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, comment_id=comment_id, ), extra_environ=xhr_header, params={ "csrf_token": csrf_token, "text": test_text, "version": "0", }, ) test_text_v2 = "test_v2" response = self.app.post( route_path( "pullrequest_comment_edit", repo_name=target_scm_name, pull_request_id=pull_request.pull_request_id, comment_id=comment_id, ), extra_environ=xhr_header, params={ "csrf_token": csrf_token, "text": test_text_v2, "version": "0", }, status=409, ) assert response.status_int == 409 text_form_db = ( ChangesetComment.query() .filter(ChangesetComment.comment_id == comment_id) .first() .text ) assert test_text == text_form_db assert test_text_v2 != text_form_db def test_comment_and_comment_edit_permissions_forbidden( self, autologin_regular_user, user_regular, user_admin, pr_util, csrf_token, xhr_header, ): pull_request = pr_util.create_pull_request( author=user_admin.username, enable_notifications=False ) comment = CommentsModel().create( text="test", repo=pull_request.target_repo.scm_instance().name, user=user_admin, pull_request=pull_request, ) response = self.app.post( route_path( "pullrequest_comment_edit", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, comment_id=comment.comment_id, ), extra_environ=xhr_header, params={ "csrf_token": csrf_token, "text": "test_text", }, status=403, ) assert response.status_int == 403 def test_create_pull_request(self, backend, csrf_token): commits = [ {"message": "ancestor"}, {"message": "change"}, {"message": "change2"}, ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["ancestor"]) source = backend.create_repo(heads=["change2"]) response = self.app.post( route_path("pullrequest_create", repo_name=source.repo_name), [ ("source_repo", source.repo_name), ("source_ref", "branch:default:" + commit_ids["change2"]), ("target_repo", target.repo_name), ("target_ref", "branch:default:" + commit_ids["ancestor"]), ("common_ancestor", commit_ids["ancestor"]), ("pullrequest_title", "Title"), ("pullrequest_desc", "Description"), ("description_renderer", "markdown"), ("__start__", "review_members:sequence"), ("__start__", "reviewer:mapping"), ("user_id", "1"), ("__start__", "reasons:sequence"), ("reason", "Some reason"), ("__end__", "reasons:sequence"), ("__start__", "rules:sequence"), ("__end__", "rules:sequence"), ("mandatory", "False"), ("__end__", "reviewer:mapping"), ("__end__", "review_members:sequence"), ("__start__", "revisions:sequence"), ("revisions", commit_ids["change"]), ("revisions", commit_ids["change2"]), ("__end__", "revisions:sequence"), ("user", ""), ("csrf_token", csrf_token), ], status=302, ) location = response.headers["Location"] pull_request_id = location.rsplit("/", 1)[1] assert pull_request_id != "new" pull_request = PullRequest.get(int(pull_request_id)) # check that we have now both revisions assert pull_request.revisions == [commit_ids["change2"], commit_ids["change"]] assert pull_request.source_ref == "branch:default:" + commit_ids["change2"] expected_target_ref = "branch:default:" + commit_ids["ancestor"] assert pull_request.target_ref == expected_target_ref def test_reviewer_notifications(self, backend, csrf_token): # We have to use the app.post for this test, so it will create the # notifications properly with the new PR commits = [ { "message": "ancestor", "added": [FileNode(b"file_A", content=b"content_of_ancestor")], }, { "message": "change", "added": [FileNode(b"file_a", content=b"content_of_change")], }, {"message": "change-child"}, { "message": "ancestor-child", "parents": ["ancestor"], "branch": "feature", "added": [FileNode(b"file_c", content=b"content_of_ancestor_child")], }, {"message": "ancestor-child-2", "branch": "feature"}, ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["ancestor-child"]) source = backend.create_repo(heads=["change"]) response = self.app.post( route_path("pullrequest_create", repo_name=source.repo_name), [ ("source_repo", source.repo_name), ("source_ref", "branch:default:" + commit_ids["change"]), ("target_repo", target.repo_name), ("target_ref", "branch:default:" + commit_ids["ancestor-child"]), ("common_ancestor", commit_ids["ancestor"]), ("pullrequest_title", "Title"), ("pullrequest_desc", "Description"), ("description_renderer", "markdown"), ("__start__", "review_members:sequence"), ("__start__", "reviewer:mapping"), ("user_id", "2"), ("__start__", "reasons:sequence"), ("reason", "Some reason"), ("__end__", "reasons:sequence"), ("__start__", "rules:sequence"), ("__end__", "rules:sequence"), ("mandatory", "False"), ("__end__", "reviewer:mapping"), ("__end__", "review_members:sequence"), ("__start__", "revisions:sequence"), ("revisions", commit_ids["change"]), ("__end__", "revisions:sequence"), ("user", ""), ("csrf_token", csrf_token), ], status=302, ) location = response.headers["Location"] pull_request_id = location.rsplit("/", 1)[1] assert pull_request_id != "new" pull_request = PullRequest.get(int(pull_request_id)) # Check that a notification was made notifications = Notification.query().filter( Notification.created_by == pull_request.author.user_id, Notification.type_ == Notification.TYPE_PULL_REQUEST, Notification.subject.contains( "requested a pull request review. !%s" % pull_request_id ), ) assert len(notifications.all()) == 1 # Change reviewers and check that a notification was made PullRequestModel().update_reviewers( pull_request.pull_request_id, [(1, [], False, "reviewer", [])], pull_request.author, ) assert len(notifications.all()) == 2 def test_create_pull_request_stores_ancestor_commit_id(self, backend, csrf_token): commits = [ { "message": "ancestor", "added": [FileNode(b"file_A", content=b"content_of_ancestor")], }, { "message": "change", "added": [FileNode(b"file_a", content=b"content_of_change")], }, { "message": "change-child", "added": [FileNode(b"file_c", content=b"content_of_change_2")], }, { "message": "ancestor-child", "parents": ["ancestor"], "branch": "feature", "added": [FileNode(b"file_B", content=b"content_of_ancestor_child")], }, {"message": "ancestor-child-2", "branch": "feature"}, ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["ancestor-child"]) source = backend.create_repo(heads=["change"]) response = self.app.post( route_path("pullrequest_create", repo_name=source.repo_name), [ ("source_repo", source.repo_name), ("source_ref", "branch:default:" + commit_ids["change"]), ("target_repo", target.repo_name), ("target_ref", "branch:default:" + commit_ids["ancestor-child"]), ("common_ancestor", commit_ids["ancestor"]), ("pullrequest_title", "Title"), ("pullrequest_desc", "Description"), ("description_renderer", "markdown"), ("__start__", "review_members:sequence"), ("__start__", "reviewer:mapping"), ("user_id", "1"), ("__start__", "reasons:sequence"), ("reason", "Some reason"), ("__end__", "reasons:sequence"), ("__start__", "rules:sequence"), ("__end__", "rules:sequence"), ("mandatory", "False"), ("__end__", "reviewer:mapping"), ("__end__", "review_members:sequence"), ("__start__", "revisions:sequence"), ("revisions", commit_ids["change"]), ("__end__", "revisions:sequence"), ("user", ""), ("csrf_token", csrf_token), ], status=302, ) location = response.headers["Location"] pull_request_id = location.rsplit("/", 1)[1] assert pull_request_id != "new" pull_request = PullRequest.get(int(pull_request_id)) # target_ref has to point to the ancestor's commit_id in order to # show the correct diff expected_target_ref = "branch:default:" + commit_ids["ancestor"] assert pull_request.target_ref == expected_target_ref # Check generated diff contents response = response.follow() response.mustcontain(no=["content_of_ancestor"]) response.mustcontain(no=["content_of_ancestor-child"]) response.mustcontain("content_of_change") def test_merge_pull_request_enabled(self, pr_util, csrf_token, rcextensions_modification): pull_request = pr_util.create_pull_request(approved=True, mergeable=True) pull_request_id = pull_request.pull_request_id repo_name = pull_request.target_repo.scm_instance().name url = route_path( "pullrequest_merge", repo_name=repo_name, pull_request_id=pull_request_id, ) rcstack_location = os.path.dirname(self.app._pyramid_registry.settings['__file__']) rc_ext_location = os.path.join(rcstack_location, 'rcextension-output.txt') mods = [ ('_push_hook', f""" import os action = kwargs['action'] commit_ids = kwargs['commit_ids'] with open('{rc_ext_location}', 'w') as f: f.write('test-execution'+os.linesep) f.write(f'{{action}}'+os.linesep) f.write(f'{{commit_ids}}'+os.linesep) return HookResponse(0, 'HOOK_TEST') """) ] # Add the hook with rcextensions_modification(rcstack_location, mods, create_if_missing=True, force_create=True): response = self.app.post(url, params={"csrf_token": csrf_token}).follow() pull_request = PullRequest.get(pull_request_id) assert response.status_int == 200 assert pull_request.is_closed() assert_pull_request_status(pull_request, ChangesetStatus.STATUS_APPROVED) # Check the relevant log entries were added user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(3) actions = [log.action for log in user_logs] pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request) expected_actions = [ "repo.pull_request.close", "repo.pull_request.merge", "repo.pull_request.comment.create", ] assert actions == expected_actions user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(4) actions = [log for log in user_logs] assert actions[-1].action == "user.push" assert actions[-1].action_data["commit_ids"] == pr_commit_ids with open(rc_ext_location) as f: f_data = f.read() assert 'test-execution' in f_data for commit_id in pr_commit_ids: assert f'{commit_id}' in f_data def test_merge_pull_request_forbidden_by_pre_push_hook(self, pr_util, csrf_token, rcextensions_modification, caplog): caplog.set_level(logging.WARNING, logger="rhodecode.model.pull_request") pull_request = pr_util.create_pull_request(approved=True, mergeable=True) pull_request_id = pull_request.pull_request_id repo_name = pull_request.target_repo.scm_instance().name url = route_path( "pullrequest_merge", repo_name=repo_name, pull_request_id=pull_request_id, ) rcstack_location = os.path.dirname(self.app._pyramid_registry.settings['__file__']) mods = [ ('_pre_push_hook', f""" return HookResponse(1, 'HOOK_TEST_FORBIDDEN') """) ] # Add the hook with rcextensions_modification(rcstack_location, mods, create_if_missing=True, force_create=True): self.app.post(url, params={"csrf_token": csrf_token}) assert 'Merge failed, not updating the pull request.' in [r[2] for r in caplog.record_tuples] def test_merge_pull_request_disabled(self, pr_util, csrf_token): pull_request = pr_util.create_pull_request(mergeable=False) pull_request_id = pull_request.pull_request_id pull_request = PullRequest.get(pull_request_id) response = self.app.post( route_path( "pullrequest_merge", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ), params={"csrf_token": csrf_token}, ).follow() assert response.status_int == 200 response.mustcontain( "Merge is not currently possible because of below failed checks." ) response.mustcontain("Server-side pull request merging is disabled.") @pytest.mark.skip_backends("svn") def test_merge_pull_request_not_approved(self, pr_util, csrf_token): pull_request = pr_util.create_pull_request(mergeable=True) pull_request_id = pull_request.pull_request_id repo_name = pull_request.target_repo.scm_instance().name response = self.app.post( route_path( "pullrequest_merge", repo_name=repo_name, pull_request_id=pull_request_id, ), params={"csrf_token": csrf_token}, ).follow() assert response.status_int == 200 response.mustcontain( "Merge is not currently possible because of below failed checks." ) response.mustcontain("Pull request reviewer approval is pending.") def test_merge_pull_request_renders_failure_reason( self, user_regular, csrf_token, pr_util ): pull_request = pr_util.create_pull_request(mergeable=True, approved=True) pull_request_id = pull_request.pull_request_id repo_name = pull_request.target_repo.scm_instance().name merge_resp = MergeResponse( True, False, Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"), MergeFailureReason.PUSH_FAILED, metadata={"target": "shadow repo", "merge_commit": "xxx"}, ) model_patcher = mock.patch.multiple( PullRequestModel, merge_repo=mock.Mock(return_value=merge_resp), merge_status=mock.Mock(return_value=(None, True, "WRONG_MESSAGE")), ) with model_patcher: response = self.app.post( route_path( "pullrequest_merge", repo_name=repo_name, pull_request_id=pull_request_id, ), params={"csrf_token": csrf_token}, status=302, ) merge_resp = MergeResponse( True, True, Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"), MergeFailureReason.PUSH_FAILED, metadata={"target": "shadow repo", "merge_commit": "xxx"}, ) assert_session_flash(response, merge_resp.merge_status_message) def test_update_source_revision(self, backend, csrf_token): commits = [ {"message": "ancestor"}, {"message": "change"}, {"message": "change-2"}, ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["ancestor"]) source = backend.create_repo(heads=["change"]) # create pr from a in source to A in target pull_request = PullRequest() pull_request.source_repo = source pull_request.source_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["change"] ) pull_request.target_repo = target pull_request.target_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["ancestor"] ) pull_request.revisions = [commit_ids["change"]] pull_request.title = "Test" pull_request.description = "Description" pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) pull_request.pull_request_state = PullRequest.STATE_CREATED Session().add(pull_request) Session().commit() pull_request_id = pull_request.pull_request_id # source has ancestor - change - change-2 backend.pull_heads(source, heads=["change-2"]) target_repo_name = target.repo_name # update PR self.app.post( route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ), params={"update_commits": "true", "csrf_token": csrf_token}, ) response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request.pull_request_id, ) ) assert response.status_int == 200 response.mustcontain("Pull request updated to") response.mustcontain("with 1 added, 0 removed commits.") # check that we have now both revisions pull_request = PullRequest.get(pull_request_id) assert pull_request.revisions == [commit_ids["change-2"], commit_ids["change"]] def test_update_target_revision(self, backend, csrf_token): """ Checks when we add more commits into a target branch, and update PR """ commits = [ {"message": "commit-a"}, # main branch (our PR target) {"message": "commit-b"}, # Initial source {"message": "commit-c"}, {"message": "commit-a-prime", "branch": "feature", "parents": ["commit-a"]}, # main branch (source) ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["commit-a"]) source = backend.create_repo(heads=["commit-b"]) target_repo_name = target.repo_name # create pr from commit-b to commit-a pull_request = PullRequest() pull_request.target_repo = target pull_request.target_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["commit-a"] ) pull_request.source_repo = source pull_request.source_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["commit-b"] ) pull_request.revisions = [commit_ids["commit-b"]] pull_request.title = "Test" pull_request.description = "Description" pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) pull_request.pull_request_state = PullRequest.STATE_CREATED Session().add(pull_request) Session().commit() pull_request_id = pull_request.pull_request_id # target - add one commit on top commit-a -> commit-b backend.pull_heads(target, heads=["commit-b"]) # source has ancestor - ancestor-new - change-rebased backend.pull_heads(source, heads=["commit-c"]) # update PR url = route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ) self.app.post( url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200 ) # check that we have now both revisions pull_request = PullRequest.get(pull_request_id) assert pull_request.revisions == [commit_ids["commit-c"]] assert pull_request.target_ref == "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["commit-b"] ) response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request.pull_request_id, ) ) assert response.status_int == 200 response.mustcontain("Pull request updated to") response.mustcontain("with 1 added, 1 removed commits.") def test_update_target_revision_with_removal_of_1_commit_git( self, backend_git, csrf_token ): backend = backend_git commits = [ {"message": "master-commit-1"}, {"message": "master-commit-2-change-1"}, {"message": "master-commit-3-change-2"}, { "message": "feat-commit-1", "parents": ["master-commit-1"], "branch": "feature", }, {"message": "feat-commit-2", "branch": "feature"}, ] commit_ids = backend.create_master_repo(commits) target = backend.create_repo(heads=["master-commit-3-change-2"]) source = backend.create_repo(heads=["feat-commit-2"]) # create pr from a in source to A in target pull_request = PullRequest() pull_request.source_repo = source pull_request.source_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["master-commit-3-change-2"], ) pull_request.target_repo = target pull_request.target_ref = "branch:{branch}:{commit_id}".format( branch=backend.default_branch_name, commit_id=commit_ids["feat-commit-2"] ) pull_request.revisions = [ commit_ids["feat-commit-1"], commit_ids["feat-commit-2"], ] pull_request.title = "Test" pull_request.description = "Description" pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) pull_request.pull_request_state = PullRequest.STATE_CREATED Session().add(pull_request) Session().commit() pull_request_id = pull_request.pull_request_id # PR is created, now we simulate a force-push into target, # that drops a 2 last commits vcsrepo = target.scm_instance() vcsrepo.config.clear_section("hooks") vcsrepo.run_git_command(["reset", "--soft", "HEAD~2"]) target_repo_name = target.repo_name # update PR url = route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ) self.app.post( url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200 ) response = self.app.get( route_path("pullrequest_new", repo_name=target_repo_name) ) assert response.status_int == 200 response.mustcontain("Pull request updated to") response.mustcontain("with 0 added, 0 removed commits.") @pytest.mark.xfail(reason="unable to fix after pygit update") def test_update_pr_ancestor_reference(self, csrf_token, pr_util: PRTestUtility): commits = [ {"message": "ancestor"}, {"message": "change"}, {"message": "change-2"}, {"message": "ancestor-new", "parents": ["ancestor"], "branch": "feature"}, {"message": "change-rebased", "branch": "feature"}, ] pull_request = pr_util.create_pull_request( commits, target_head="ancestor", source_head="change", revisions=["change"], ) pull_request_id = pull_request.pull_request_id target_repo_name = pr_util.target_repository.repo_name commit_ids = pr_util.commit_ids assert pull_request.revisions == [commit_ids["change"]] assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name] assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name] branch = "feature" pr_util.update_target_repository(head="ancestor-new", do_fetch=True) pr_util.set_pr_target_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["ancestor-new"]) pr_util.update_source_repository(head="change-rebased", do_fetch=True) pr_util.set_pr_source_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["change-rebased"]) assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch] assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch] Session().add(pr_util.pull_request) Session().commit() self.app.post( route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pull_request_id, ), params={"update_commits": "true", "csrf_token": csrf_token, "force_refresh": True}, status=200, ) # response = self.app.get( # route_path( # "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, # params={"force_refresh": True} # ), # ) # # response.mustcontain("Pull request updated to") # response.mustcontain("with 1 added, 0 removed commits.") pull_request = PullRequest.get(pull_request_id) assert pull_request.target_ref == "branch:{branch}:{commit_id}".format( branch="feature", commit_id=commit_ids["ancestor-new"]) assert pull_request.revisions == [commit_ids["change-rebased"]] def test_remove_pull_request_branch(self, backend_git, csrf_token): branch_name = "development" commits = [ {"message": "initial-commit"}, {"message": "old-feature"}, {"message": "new-feature", "branch": branch_name}, ] repo = backend_git.create_repo(commits) repo_name = repo.repo_name commit_ids = backend_git.commit_ids pull_request = PullRequest() pull_request.source_repo = repo pull_request.target_repo = repo pull_request.source_ref = "branch:{branch}:{commit_id}".format( branch=branch_name, commit_id=commit_ids["new-feature"] ) pull_request.target_ref = "branch:{branch}:{commit_id}".format( branch=backend_git.default_branch_name, commit_id=commit_ids["old-feature"] ) pull_request.revisions = [commit_ids["new-feature"]] pull_request.title = "Test" pull_request.description = "Description" pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) pull_request.pull_request_state = PullRequest.STATE_CREATED Session().add(pull_request) Session().commit() pull_request_id = pull_request.pull_request_id vcs = repo.scm_instance() vcs.remove_ref("refs/heads/{}".format(branch_name)) # NOTE(marcink): run GC to ensure the commits are gone vcs.run_gc() response = self.app.get( route_path( "pullrequest_show", repo_name=repo_name, pull_request_id=pull_request_id ) ) assert response.status_int == 200 response.assert_response().element_contains( "#changeset_compare_view_content .alert strong", "Missing commits" ) response.assert_response().element_contains( "#changeset_compare_view_content .alert", "This pull request cannot be displayed, because one or more" " commits no longer exist in the source repository.", ) def test_strip_commits_from_pull_request(self, backend, pr_util): commits = [ {"message": "initial-commit"}, {"message": "old-feature"}, {"message": "new-feature"}, ] pull_request = pr_util.create_pull_request( commits, target_head="initial-commit", source_head="new-feature", revisions=["new-feature"], ) vcs = pr_util.source_repository.scm_instance() if backend.alias == "git": vcs.strip(pr_util.commit_ids["new-feature"], branch_name=pr_util.backend.default_branch_name) else: vcs.strip(pr_util.commit_ids["new-feature"]) response = self.app.get( route_path( "pullrequest_show", repo_name=pr_util.target_repository.repo_name, pull_request_id=pull_request.pull_request_id, ) ) assert response.status_int == 200 response.assert_response().element_contains( "#changeset_compare_view_content .alert strong", "Missing commits" ) response.assert_response().element_contains( "#changeset_compare_view_content .alert", "This pull request cannot be displayed, because one or more" " commits no longer exist in the source repository.", ) response.assert_response().element_contains("#update_commits", "Update commits") def test_strip_commits_and_update(self, backend, pr_util, csrf_token): commits = [ {"message": "initial-commit"}, {"message": "old-feature"}, {"message": "new-feature", "parents": ["old-feature"]}, ] pull_request = pr_util.create_pull_request( commits, target_head="old-feature", source_head="new-feature", revisions=["new-feature"], mergeable=True, ) pr_id = pull_request.pull_request_id target_repo_name = pull_request.target_repo.repo_name vcs = pr_util.source_repository.scm_instance() if backend.alias == "git": vcs.strip(pr_util.commit_ids["new-feature"], branch_name="master") else: vcs.strip(pr_util.commit_ids["new-feature"]) url = route_path( "pullrequest_update", repo_name=target_repo_name, pull_request_id=pr_id ) response = self.app.post( url, params={"update_commits": "true", "csrf_token": csrf_token} ) assert response.status_int == 200 assert json.loads(response.body) == json.loads( '{"response": true, "redirect_url": null}' ) # Make sure that after update, it won't raise 500 errors response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo_name, pull_request_id=pr_id ) ) assert response.status_int == 200 response.assert_response().element_contains( "#changeset_compare_view_content .alert strong", "Missing commits" ) def test_branch_is_a_link(self, pr_util): pull_request = pr_util.create_pull_request() pull_request.source_ref = "branch:origin:1234567890abcdef" pull_request.target_ref = "branch:target:abcdef1234567890" Session().add(pull_request) Session().commit() response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) assert response.status_int == 200 source = response.assert_response().get_element(".pr-source-info") source_parent = source.getparent() assert len(source_parent) == 1 target = response.assert_response().get_element(".pr-target-info") target_parent = target.getparent() assert len(target_parent) == 1 expected_origin_link = route_path( "repo_commits", repo_name=pull_request.source_repo.scm_instance().name, params=dict(branch="origin"), ) expected_target_link = route_path( "repo_commits", repo_name=pull_request.target_repo.scm_instance().name, params=dict(branch="target"), ) assert source_parent.attrib["href"] == expected_origin_link assert target_parent.attrib["href"] == expected_target_link def test_bookmark_is_not_a_link(self, pr_util): pull_request = pr_util.create_pull_request() pull_request.source_ref = "bookmark:origin:1234567890abcdef" pull_request.target_ref = "bookmark:target:abcdef1234567890" Session().add(pull_request) Session().commit() response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) assert response.status_int == 200 source = response.assert_response().get_element(".pr-source-info") assert source.text.strip() == "bookmark:origin" assert source.getparent().attrib.get("href") is None target = response.assert_response().get_element(".pr-target-info") assert target.text.strip() == "bookmark:target" assert target.getparent().attrib.get("href") is None def test_tag_is_not_a_link(self, pr_util): pull_request = pr_util.create_pull_request() pull_request.source_ref = "tag:origin:1234567890abcdef" pull_request.target_ref = "tag:target:abcdef1234567890" Session().add(pull_request) Session().commit() response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) assert response.status_int == 200 source = response.assert_response().get_element(".pr-source-info") assert source.text.strip() == "tag:origin" assert source.getparent().attrib.get("href") is None target = response.assert_response().get_element(".pr-target-info") assert target.text.strip() == "tag:target" assert target.getparent().attrib.get("href") is None @pytest.mark.parametrize("mergeable", [True, False]) def test_shadow_repository_link(self, mergeable, pr_util, http_host_only_stub): """ Check that the pull request summary page displays a link to the shadow repository if the pull request is mergeable. If it is not mergeable the link should not be displayed. """ pull_request = pr_util.create_pull_request( mergeable=mergeable, enable_notifications=False ) target_repo = pull_request.target_repo.scm_instance() pr_id = pull_request.pull_request_id shadow_url = "{host}/{repo}/pull-request/{pr_id}/repository".format( host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id ) response = self.app.get( route_path( "pullrequest_show", repo_name=target_repo.name, pull_request_id=pr_id ) ) if mergeable: response.assert_response().element_value_contains( "input.pr-mergeinfo", shadow_url ) response.assert_response().element_value_contains( "input.pr-mergeinfo ", "pr-merge" ) else: response.assert_response().no_element_exists(".pr-mergeinfo") @pytest.mark.usefixtures("app") @pytest.mark.backends("git", "hg") class TestPullrequestsControllerDelete(object): def test_pull_request_delete_button_permissions_admin( self, autologin_user, user_admin, pr_util ): pull_request = pr_util.create_pull_request( author=user_admin.username, enable_notifications=False ) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) response.mustcontain('id="delete_pullrequest"') response.mustcontain("Confirm to delete this pull request") def test_pull_request_delete_button_permissions_owner( self, autologin_regular_user, user_regular, pr_util ): pull_request = pr_util.create_pull_request( author=user_regular.username, enable_notifications=False ) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) response.mustcontain('id="delete_pullrequest"') response.mustcontain("Confirm to delete this pull request") def test_pull_request_delete_button_permissions_forbidden( self, autologin_regular_user, user_regular, user_admin, pr_util ): pull_request = pr_util.create_pull_request( author=user_admin.username, enable_notifications=False ) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) response.mustcontain(no=['id="delete_pullrequest"']) response.mustcontain(no=["Confirm to delete this pull request"]) def test_pull_request_delete_button_permissions_can_update_cannot_delete( self, autologin_regular_user, user_regular, user_admin, pr_util, user_util ): pull_request = pr_util.create_pull_request( author=user_admin.username, enable_notifications=False ) user_util.grant_user_permission_to_repo( pull_request.target_repo, user_regular, "repository.write" ) response = self.app.get( route_path( "pullrequest_show", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, ) ) response.mustcontain('id="open_edit_pullrequest"') response.mustcontain('id="delete_pullrequest"') response.mustcontain(no=["Confirm to delete this pull request"]) def test_delete_comment_returns_404_if_comment_does_not_exist( self, autologin_user, pr_util, user_admin, csrf_token, xhr_header ): pull_request = pr_util.create_pull_request( author=user_admin.username, enable_notifications=False ) self.app.post( route_path( "pullrequest_comment_delete", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, comment_id=1024404, ), extra_environ=xhr_header, params={"csrf_token": csrf_token}, status=404, ) def test_delete_comment( self, autologin_user, pr_util, user_admin, csrf_token, xhr_header ): pull_request = pr_util.create_pull_request( author=user_admin.username, enable_notifications=False ) comment = pr_util.create_comment() comment_id = comment.comment_id response = self.app.post( route_path( "pullrequest_comment_delete", repo_name=pull_request.target_repo.scm_instance().name, pull_request_id=pull_request.pull_request_id, comment_id=comment_id, ), extra_environ=xhr_header, params={"csrf_token": csrf_token}, status=200, ) assert response.text == "true" @pytest.mark.parametrize( "url_type", [ "pullrequest_new", "pullrequest_create", "pullrequest_update", "pullrequest_merge", ], ) def test_pull_request_is_forbidden_on_archived_repo( self, autologin_user, backend, xhr_header, user_util, url_type ): # create a temporary repo source = user_util.create_repo(repo_type=backend.alias) repo_name = source.repo_name repo = Repository.get_by_repo_name(repo_name) repo.archived = True Session().commit() response = self.app.get( route_path(url_type, repo_name=repo_name, pull_request_id=1), status=302 ) msg = "Action not supported for archived repository." assert_session_flash(response, msg) def assert_pull_request_status(pull_request, expected_status): status = ChangesetStatusModel().calculated_review_status(pull_request=pull_request) assert status == expected_status @pytest.mark.parametrize("route", ["pullrequest_new", "pullrequest_create"]) @pytest.mark.usefixtures("autologin_user") def test_forbidde_to_repo_summary_for_svn_repositories(backend_svn, app, route): app.get(route_path(route, repo_name=backend_svn.repo_name), status=404)