test_repo_pullrequests.py
1983 lines
| 72.9 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r1974 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r5607 | import logging | |||
import os | ||||
r1974 | import mock | |||
import pytest | ||||
import rhodecode | ||||
r4863 | from rhodecode.lib import helpers as h | |||
r5198 | from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason, Reference | |||
r1974 | from rhodecode.lib.vcs.nodes import FileNode | |||
r4863 | from rhodecode.lib.ext_json import json | |||
r1974 | from rhodecode.model.changeset_status import ChangesetStatusModel | |||
from rhodecode.model.db import ( | ||||
r5198 | PullRequest, | |||
ChangesetStatus, | ||||
UserLog, | ||||
Notification, | ||||
ChangesetComment, | ||||
Repository, | ||||
) | ||||
r1974 | from rhodecode.model.meta import Session | |||
from rhodecode.model.pull_request import PullRequestModel | ||||
from rhodecode.model.user import UserModel | ||||
r4401 | from rhodecode.model.comment import CommentsModel | |||
r1974 | from rhodecode.tests import ( | |||
r5198 | assert_session_flash, | |||
TEST_USER_ADMIN_LOGIN, | ||||
TEST_USER_REGULAR_LOGIN, | ||||
) | ||||
r5607 | from rhodecode.tests.fixtures.fixture_utils import PRTestUtility | |||
r5173 | from rhodecode.tests.routes import route_path | |||
r1974 | ||||
r5198 | @pytest.mark.usefixtures("app", "autologin_user") | |||
r1974 | @pytest.mark.backends("git", "hg") | |||
class TestPullrequestsView(object): | ||||
def test_index(self, backend): | ||||
r5198 | self.app.get(route_path("pullrequest_new", repo_name=backend.repo_name)) | |||
r1974 | ||||
def test_option_menu_create_pull_request_exists(self, backend): | ||||
repo_name = backend.repo_name | ||||
r5198 | response = self.app.get(h.route_path("repo_summary", repo_name=repo_name)) | |||
r1974 | ||||
create_pr_link = '<a href="%s">Create Pull Request</a>' % route_path( | ||||
r5198 | "pullrequest_new", repo_name=repo_name | |||
) | ||||
r1974 | response.mustcontain(create_pr_link) | |||
def test_create_pr_form_with_raw_commit_id(self, backend): | ||||
repo = backend.repo | ||||
self.app.get( | ||||
r5198 | route_path( | |||
"pullrequest_new", | ||||
repo_name=repo.repo_name, | ||||
commit=repo.get_commit().raw_id, | ||||
), | ||||
status=200, | ||||
) | ||||
r1974 | ||||
r5198 | @pytest.mark.parametrize("pr_merge_enabled", [True, False]) | |||
@pytest.mark.parametrize("range_diff", ["0", "1"]) | ||||
r3124 | def test_show(self, pr_util, pr_merge_enabled, range_diff): | |||
r1974 | pull_request = pr_util.create_pull_request( | |||
r5198 | mergeable=pr_merge_enabled, enable_notifications=False | |||
) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
params={"range-diff": range_diff}, | ||||
) | ||||
) | ||||
r1974 | ||||
for commit_id in pull_request.revisions: | ||||
response.mustcontain(commit_id) | ||||
r4136 | response.mustcontain(pull_request.target_ref_parts.type) | |||
response.mustcontain(pull_request.target_ref_parts.name) | ||||
r1974 | ||||
r4136 | response.mustcontain('class="pull-request-merge"') | |||
r3124 | if pr_merge_enabled: | |||
r5198 | response.mustcontain("Pull request reviewer approval is pending") | |||
r3124 | else: | |||
r5198 | response.mustcontain("Server-side pull request merging is disabled.") | |||
r3124 | ||||
if range_diff == "1": | ||||
r5198 | response.mustcontain("Turn off: Show the diff as commit range") | |||
r1974 | ||||
r4426 | def test_show_versions_of_pr(self, backend, csrf_token): | |||
commits = [ | ||||
r5198 | { | |||
"message": "initial-commit", | ||||
"added": [FileNode(b"test-file.txt", b"LINE1\n")], | ||||
}, | ||||
{ | ||||
"message": "commit-1", | ||||
"changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\n")], | ||||
}, | ||||
r4426 | # Above is the initial version of PR that changes a single line | |||
# from now on we'll add 3x commit adding a nother line on each step | ||||
r5198 | { | |||
"message": "commit-2", | ||||
"changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\n")], | ||||
}, | ||||
{ | ||||
"message": "commit-3", | ||||
"changed": [ | ||||
FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\n") | ||||
], | ||||
}, | ||||
{ | ||||
"message": "commit-4", | ||||
"changed": [ | ||||
FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\nLINE5\n") | ||||
], | ||||
}, | ||||
r4426 | ] | |||
commit_ids = backend.create_master_repo(commits) | ||||
r5198 | target = backend.create_repo(heads=["initial-commit"]) | |||
source = backend.create_repo(heads=["commit-1"]) | ||||
r4426 | source_repo_name = source.repo_name | |||
target_repo_name = target.repo_name | ||||
r5198 | target_ref = "branch:{branch}:{commit_id}".format( | |||
branch=backend.default_branch_name, commit_id=commit_ids["initial-commit"] | ||||
) | ||||
source_ref = "branch:{branch}:{commit_id}".format( | ||||
branch=backend.default_branch_name, commit_id=commit_ids["commit-1"] | ||||
) | ||||
r4426 | ||||
response = self.app.post( | ||||
r5198 | route_path("pullrequest_create", repo_name=source.repo_name), | |||
r4426 | [ | |||
r5198 | ("source_repo", source_repo_name), | |||
("source_ref", source_ref), | ||||
("target_repo", target_repo_name), | ||||
("target_ref", target_ref), | ||||
("common_ancestor", commit_ids["initial-commit"]), | ||||
("pullrequest_title", "Title"), | ||||
("pullrequest_desc", "Description"), | ||||
("description_renderer", "markdown"), | ||||
("__start__", "review_members:sequence"), | ||||
("__start__", "reviewer:mapping"), | ||||
("user_id", "1"), | ||||
("__start__", "reasons:sequence"), | ||||
("reason", "Some reason"), | ||||
("__end__", "reasons:sequence"), | ||||
("__start__", "rules:sequence"), | ||||
("__end__", "rules:sequence"), | ||||
("mandatory", "False"), | ||||
("__end__", "reviewer:mapping"), | ||||
("__end__", "review_members:sequence"), | ||||
("__start__", "revisions:sequence"), | ||||
("revisions", commit_ids["commit-1"]), | ||||
("__end__", "revisions:sequence"), | ||||
("user", ""), | ||||
("csrf_token", csrf_token), | ||||
r4426 | ], | |||
r5198 | status=302, | |||
) | ||||
r4426 | ||||
r5198 | location = response.headers["Location"] | |||
r4426 | ||||
r5198 | pull_request_id = location.rsplit("/", 1)[1] | |||
assert pull_request_id != "new" | ||||
r4426 | pull_request = PullRequest.get(int(pull_request_id)) | |||
pull_request_id = pull_request.pull_request_id | ||||
# Show initial version of PR | ||||
response = self.app.get( | ||||
r5198 | route_path( | |||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
) | ||||
) | ||||
r4426 | ||||
r5198 | response.mustcontain("commit-1") | |||
response.mustcontain(no=["commit-2"]) | ||||
response.mustcontain(no=["commit-3"]) | ||||
response.mustcontain(no=["commit-4"]) | ||||
r4426 | ||||
response.mustcontain('cb-addition"></span><span>LINE2</span>') | ||||
r5198 | response.mustcontain(no=["LINE3"]) | |||
response.mustcontain(no=["LINE4"]) | ||||
response.mustcontain(no=["LINE5"]) | ||||
r4426 | ||||
# update PR #1 | ||||
source_repo = Repository.get_by_repo_name(source_repo_name) | ||||
r5198 | backend.pull_heads(source_repo, heads=["commit-2"]) | |||
r4426 | response = self.app.post( | |||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"update_commits": "true", "csrf_token": csrf_token}, | ||||
) | ||||
r4426 | ||||
# update PR #2 | ||||
source_repo = Repository.get_by_repo_name(source_repo_name) | ||||
r5198 | backend.pull_heads(source_repo, heads=["commit-3"]) | |||
r4426 | response = self.app.post( | |||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"update_commits": "true", "csrf_token": csrf_token}, | ||||
) | ||||
r4426 | ||||
# update PR #3 | ||||
source_repo = Repository.get_by_repo_name(source_repo_name) | ||||
r5198 | backend.pull_heads(source_repo, heads=["commit-4"]) | |||
r4426 | response = self.app.post( | |||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"update_commits": "true", "csrf_token": csrf_token}, | ||||
) | ||||
r4426 | ||||
# Show final version ! | ||||
response = self.app.get( | ||||
r5198 | route_path( | |||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
) | ||||
) | ||||
r4426 | ||||
# 3 updates, and the latest == 4 | ||||
r5198 | response.mustcontain("4 versions available for this pull request") | |||
response.mustcontain(no=["rhodecode diff rendering error"]) | ||||
r4426 | ||||
# initial show must have 3 commits, and 3 adds | ||||
r5198 | response.mustcontain("commit-1") | |||
response.mustcontain("commit-2") | ||||
response.mustcontain("commit-3") | ||||
response.mustcontain("commit-4") | ||||
r4426 | ||||
response.mustcontain('cb-addition"></span><span>LINE2</span>') | ||||
response.mustcontain('cb-addition"></span><span>LINE3</span>') | ||||
response.mustcontain('cb-addition"></span><span>LINE4</span>') | ||||
response.mustcontain('cb-addition"></span><span>LINE5</span>') | ||||
# fetch versions | ||||
pr = PullRequest.get(pull_request_id) | ||||
versions = [x.pull_request_version_id for x in pr.versions.all()] | ||||
assert len(versions) == 3 | ||||
# show v1,v2,v3,v4 | ||||
def cb_line(text): | ||||
return 'cb-addition"></span><span>{}</span>'.format(text) | ||||
def cb_context(text): | ||||
r5198 | return ( | |||
'<span class="cb-code"><span class="cb-action cb-context">' | ||||
"</span><span>{}</span></span>".format(text) | ||||
) | ||||
r4426 | ||||
commit_tests = { | ||||
# in response, not in response | ||||
r5198 | 1: (["commit-1"], ["commit-2", "commit-3", "commit-4"]), | |||
2: (["commit-1", "commit-2"], ["commit-3", "commit-4"]), | ||||
3: (["commit-1", "commit-2", "commit-3"], ["commit-4"]), | ||||
4: (["commit-1", "commit-2", "commit-3", "commit-4"], []), | ||||
r4426 | } | |||
diff_tests = { | ||||
r5198 | 1: (["LINE2"], ["LINE3", "LINE4", "LINE5"]), | |||
2: (["LINE2", "LINE3"], ["LINE4", "LINE5"]), | ||||
3: (["LINE2", "LINE3", "LINE4"], ["LINE5"]), | ||||
4: (["LINE2", "LINE3", "LINE4", "LINE5"], []), | ||||
r4426 | } | |||
for idx, ver in enumerate(versions, 1): | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
params={"version": ver}, | ||||
) | ||||
) | ||||
r4426 | ||||
r5198 | response.mustcontain(no=["rhodecode diff rendering error"]) | |||
response.mustcontain("Showing changes at v{}".format(idx)) | ||||
r4426 | ||||
yes, no = commit_tests[idx] | ||||
for y in yes: | ||||
response.mustcontain(y) | ||||
for n in no: | ||||
response.mustcontain(no=n) | ||||
yes, no = diff_tests[idx] | ||||
for y in yes: | ||||
response.mustcontain(cb_line(y)) | ||||
for n in no: | ||||
response.mustcontain(no=n) | ||||
# show diff between versions | ||||
diff_compare_tests = { | ||||
r5198 | 1: (["LINE3"], ["LINE1", "LINE2"]), | |||
2: (["LINE3", "LINE4"], ["LINE1", "LINE2"]), | ||||
3: (["LINE3", "LINE4", "LINE5"], ["LINE1", "LINE2"]), | ||||
r4426 | } | |||
for idx, ver in enumerate(versions, 1): | ||||
adds, context = diff_compare_tests[idx] | ||||
r5198 | to_ver = ver + 1 | |||
r4426 | if idx == 3: | |||
r5198 | to_ver = "latest" | |||
r4426 | ||||
response = self.app.get( | ||||
r5198 | route_path( | |||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
params={"from_version": versions[0], "version": to_ver}, | ||||
) | ||||
) | ||||
r4426 | ||||
r5198 | response.mustcontain(no=["rhodecode diff rendering error"]) | |||
r4426 | ||||
for a in adds: | ||||
response.mustcontain(cb_line(a)) | ||||
for c in context: | ||||
response.mustcontain(cb_context(c)) | ||||
# test version v2 -> v3 | ||||
response = self.app.get( | ||||
r5198 | route_path( | |||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
params={"from_version": versions[1], "version": versions[2]}, | ||||
) | ||||
) | ||||
r4426 | ||||
r5198 | response.mustcontain(cb_context("LINE1")) | |||
response.mustcontain(cb_context("LINE2")) | ||||
response.mustcontain(cb_context("LINE3")) | ||||
response.mustcontain(cb_line("LINE4")) | ||||
r4426 | ||||
r1974 | def test_close_status_visibility(self, pr_util, user_util, csrf_token): | |||
# Logout | ||||
response = self.app.post( | ||||
r5198 | h.route_path("logout"), params={"csrf_token": csrf_token} | |||
) | ||||
r1974 | # Login as regular user | |||
r5198 | response = self.app.post( | |||
h.route_path("login"), | ||||
{"username": TEST_USER_REGULAR_LOGIN, "password": "test12"}, | ||||
) | ||||
pull_request = pr_util.create_pull_request(author=TEST_USER_REGULAR_LOGIN) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
r5198 | response.mustcontain("Server-side pull request merging is disabled.") | |||
r1974 | ||||
assert_response = response.assert_response() | ||||
# for regular user without a merge permissions, we don't see it | ||||
r5198 | assert_response.no_element_exists("#close-pull-request-action") | |||
r1974 | ||||
user_util.grant_user_permission_to_repo( | ||||
pull_request.target_repo, | ||||
UserModel().get_by_username(TEST_USER_REGULAR_LOGIN), | ||||
r5198 | "repository.write", | |||
) | ||||
response = self.app.get( | ||||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
r5198 | response.mustcontain("Server-side pull request merging is disabled.") | |||
r1974 | ||||
assert_response = response.assert_response() | ||||
# now regular user has a merge permissions, we have CLOSE button | ||||
r5198 | assert_response.one_element_exists("#close-pull-request-action") | |||
r1974 | ||||
def test_show_invalid_commit_id(self, pr_util): | ||||
# Simulating invalid revisions which will cause a lookup error | ||||
pull_request = pr_util.create_pull_request() | ||||
r5198 | pull_request.revisions = ["invalid"] | |||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
for commit_id in pull_request.revisions: | ||||
response.mustcontain(commit_id) | ||||
def test_show_invalid_source_reference(self, pr_util): | ||||
pull_request = pr_util.create_pull_request() | ||||
r5198 | pull_request.source_ref = "branch:b:invalid" | |||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
r5198 | self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
def test_edit_title_description(self, pr_util, csrf_token): | ||||
pull_request = pr_util.create_pull_request() | ||||
pull_request_id = pull_request.pull_request_id | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=pull_request.target_repo.repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
r1974 | params={ | |||
r5198 | "edit_pull_request": "true", | |||
"title": "New title", | ||||
"description": "New description", | ||||
"csrf_token": csrf_token, | ||||
}, | ||||
) | ||||
r1974 | ||||
assert_session_flash( | ||||
r5198 | response, "Pull request title & description updated.", category="success" | |||
) | ||||
r1974 | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r5198 | assert pull_request.title == "New title" | |||
assert pull_request.description == "New description" | ||||
r1974 | ||||
r5087 | def test_edit_title_description_special(self, pr_util, csrf_token): | |||
r4631 | pull_request = pr_util.create_pull_request() | |||
pull_request_id = pull_request.pull_request_id | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=pull_request.target_repo.repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
r4631 | params={ | |||
r5198 | "edit_pull_request": "true", | |||
"title": "New title {} {2} {foo}", | ||||
"description": "New description", | ||||
"csrf_token": csrf_token, | ||||
}, | ||||
) | ||||
r4631 | ||||
assert_session_flash( | ||||
r5198 | response, "Pull request title & description updated.", category="success" | |||
) | ||||
r4631 | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r5198 | assert pull_request.title_safe == "New title {{}} {{2}} {{foo}}" | |||
r4631 | ||||
r1974 | def test_edit_title_description_closed(self, pr_util, csrf_token): | |||
pull_request = pr_util.create_pull_request() | ||||
pull_request_id = pull_request.pull_request_id | ||||
r2383 | repo_name = pull_request.target_repo.repo_name | |||
r1974 | pr_util.close() | |||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
r1974 | params={ | |||
r5198 | "edit_pull_request": "true", | |||
"title": "New title", | ||||
"description": "New description", | ||||
"csrf_token": csrf_token, | ||||
}, | ||||
status=200, | ||||
) | ||||
r1974 | assert_session_flash( | |||
r5198 | response, "Cannot update closed pull requests.", category="error" | |||
) | ||||
r1974 | ||||
def test_update_invalid_source_reference(self, pr_util, csrf_token): | ||||
from rhodecode.lib.vcs.backends.base import UpdateFailureReason | ||||
pull_request = pr_util.create_pull_request() | ||||
r5198 | pull_request.source_ref = "branch:invalid-branch:invalid-commit-id" | |||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
pull_request_id = pull_request.pull_request_id | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_update", | ||||
r1974 | repo_name=pull_request.target_repo.repo_name, | |||
r5198 | pull_request_id=pull_request_id, | |||
), | ||||
params={"update_commits": "true", "csrf_token": csrf_token}, | ||||
) | ||||
r1974 | ||||
r5198 | expected_msg = str( | |||
PullRequestModel.UPDATE_STATUS_MESSAGES[ | ||||
UpdateFailureReason.MISSING_SOURCE_REF | ||||
] | ||||
) | ||||
assert_session_flash(response, expected_msg, category="error") | ||||
r1974 | ||||
def test_missing_target_reference(self, pr_util, csrf_token): | ||||
from rhodecode.lib.vcs.backends.base import MergeFailureReason | ||||
r5198 | ||||
pull_request = pr_util.create_pull_request(approved=True, mergeable=True) | ||||
unicode_reference = "branch:invalid-branch:invalid-commit-id" | ||||
r3339 | pull_request.target_ref = unicode_reference | |||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
pull_request_id = pull_request.pull_request_id | ||||
pull_request_url = route_path( | ||||
r5198 | "pullrequest_show", | |||
r1974 | repo_name=pull_request.target_repo.repo_name, | |||
r5198 | pull_request_id=pull_request_id, | |||
) | ||||
r1974 | ||||
response = self.app.get(pull_request_url) | ||||
r5198 | # target_ref_id = "invalid-branch" | |||
r3339 | merge_resp = MergeResponse( | |||
r5198 | True, | |||
True, | ||||
Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"), | ||||
MergeFailureReason.MISSING_TARGET_REF, | ||||
metadata={ | ||||
"target_ref": PullRequest.unicode_to_reference(unicode_reference) | ||||
}, | ||||
) | ||||
r3339 | response.assert_response().element_contains( | |||
r5198 | 'div[data-role="merge-message"]', merge_resp.merge_status_message | |||
) | ||||
r1974 | ||||
def test_comment_and_close_pull_request_custom_message_approved( | ||||
r5198 | self, pr_util, csrf_token, xhr_header | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request(approved=True) | |||
pull_request_id = pull_request.pull_request_id | ||||
author = pull_request.user_id | ||||
repo = pull_request.target_repo.repo_id | ||||
self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_comment_create", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
r1974 | params={ | |||
r5198 | "close_pull_request": "1", | |||
"text": "Closing a PR", | ||||
"csrf_token": csrf_token, | ||||
}, | ||||
extra_environ=xhr_header, | ||||
) | ||||
r1974 | ||||
r5198 | journal = ( | |||
UserLog.query() | ||||
.filter(UserLog.user_id == author) | ||||
.filter(UserLog.repository_id == repo) | ||||
.order_by(UserLog.user_log_id.asc()) | ||||
r1974 | .all() | |||
r5198 | ) | |||
assert journal[-1].action == "repo.pull_request.close" | ||||
r1974 | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
assert pull_request.is_closed() | ||||
status = ChangesetStatusModel().get_status( | ||||
r5198 | pull_request.source_repo, pull_request=pull_request | |||
) | ||||
r1974 | assert status == ChangesetStatus.STATUS_APPROVED | |||
r5198 | comments = ( | |||
ChangesetComment() | ||||
.query() | ||||
.filter(ChangesetComment.pull_request == pull_request) | ||||
.order_by(ChangesetComment.comment_id.asc()) | ||||
r1974 | .all() | |||
r5198 | ) | |||
assert comments[-1].text == "Closing a PR" | ||||
r1974 | ||||
def test_comment_force_close_pull_request_rejected( | ||||
r5198 | self, pr_util, csrf_token, xhr_header | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request() | |||
pull_request_id = pull_request.pull_request_id | ||||
PullRequestModel().update_reviewers( | ||||
r5198 | pull_request_id, | |||
[ | ||||
(1, ["reason"], False, "reviewer", []), | ||||
(2, ["reason2"], False, "reviewer", []), | ||||
], | ||||
pull_request.author, | ||||
) | ||||
r1974 | author = pull_request.user_id | |||
repo = pull_request.target_repo.repo_id | ||||
self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_comment_create", | ||||
r1974 | repo_name=pull_request.target_repo.scm_instance().name, | |||
r5198 | pull_request_id=pull_request_id, | |||
), | ||||
params={"close_pull_request": "1", "csrf_token": csrf_token}, | ||||
extra_environ=xhr_header, | ||||
) | ||||
r1974 | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r5198 | journal = ( | |||
UserLog.query() | ||||
.filter(UserLog.user_id == author, UserLog.repository_id == repo) | ||||
.order_by(UserLog.user_log_id.asc()) | ||||
r1974 | .all() | |||
r5198 | ) | |||
assert journal[-1].action == "repo.pull_request.close" | ||||
r1974 | ||||
# check only the latest status, not the review status | ||||
status = ChangesetStatusModel().get_status( | ||||
r5198 | pull_request.source_repo, pull_request=pull_request | |||
) | ||||
r1974 | assert status == ChangesetStatus.STATUS_REJECTED | |||
r5198 | def test_comment_and_close_pull_request(self, pr_util, csrf_token, xhr_header): | |||
r1974 | pull_request = pr_util.create_pull_request() | |||
pull_request_id = pull_request.pull_request_id | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_comment_create", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
), | ||||
params={"close_pull_request": "true", "csrf_token": csrf_token}, | ||||
extra_environ=xhr_header, | ||||
) | ||||
r1974 | ||||
assert response.json | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
assert pull_request.is_closed() | ||||
# check only the latest status, not the review status | ||||
status = ChangesetStatusModel().get_status( | ||||
r5198 | pull_request.source_repo, pull_request=pull_request | |||
) | ||||
r1974 | assert status == ChangesetStatus.STATUS_REJECTED | |||
r4401 | def test_comment_and_close_pull_request_try_edit_comment( | |||
r5198 | self, pr_util, csrf_token, xhr_header | |||
r4401 | ): | |||
pull_request = pr_util.create_pull_request() | ||||
pull_request_id = pull_request.pull_request_id | ||||
r4410 | target_scm = pull_request.target_repo.scm_instance() | |||
target_scm_name = target_scm.name | ||||
r4401 | ||||
response = self.app.post( | ||||
route_path( | ||||
r5198 | "pullrequest_comment_create", | |||
r4410 | repo_name=target_scm_name, | |||
pull_request_id=pull_request_id, | ||||
r4401 | ), | |||
params={ | ||||
r5198 | "close_pull_request": "true", | |||
"csrf_token": csrf_token, | ||||
r4401 | }, | |||
r5198 | extra_environ=xhr_header, | |||
) | ||||
r4401 | ||||
assert response.json | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r4410 | target_scm = pull_request.target_repo.scm_instance() | |||
target_scm_name = target_scm.name | ||||
r4401 | assert pull_request.is_closed() | |||
# check only the latest status, not the review status | ||||
status = ChangesetStatusModel().get_status( | ||||
r5198 | pull_request.source_repo, pull_request=pull_request | |||
) | ||||
r4401 | assert status == ChangesetStatus.STATUS_REJECTED | |||
r4563 | for comment_id in response.json.keys(): | |||
r5198 | test_text = "test" | |||
r4563 | response = self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_edit", | |||
r4563 | repo_name=target_scm_name, | |||
pull_request_id=pull_request_id, | ||||
comment_id=comment_id, | ||||
), | ||||
extra_environ=xhr_header, | ||||
params={ | ||||
r5198 | "csrf_token": csrf_token, | |||
"text": test_text, | ||||
r4563 | }, | |||
status=403, | ||||
) | ||||
assert response.status_int == 403 | ||||
r4401 | ||||
r4410 | def test_comment_and_comment_edit(self, pr_util, csrf_token, xhr_header): | |||
r4401 | pull_request = pr_util.create_pull_request() | |||
r4410 | target_scm = pull_request.target_repo.scm_instance() | |||
target_scm_name = target_scm.name | ||||
r4401 | response = self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_create", | |||
r4410 | repo_name=target_scm_name, | |||
r5198 | pull_request_id=pull_request.pull_request_id, | |||
), | ||||
r4401 | params={ | |||
r5198 | "csrf_token": csrf_token, | |||
"text": "init", | ||||
r4401 | }, | |||
extra_environ=xhr_header, | ||||
) | ||||
assert response.json | ||||
r4563 | for comment_id in response.json.keys(): | |||
assert comment_id | ||||
r5198 | test_text = "test" | |||
r4563 | self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_edit", | |||
r4563 | repo_name=target_scm_name, | |||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=comment_id, | ||||
), | ||||
extra_environ=xhr_header, | ||||
params={ | ||||
r5198 | "csrf_token": csrf_token, | |||
"text": test_text, | ||||
"version": "0", | ||||
r4563 | }, | |||
) | ||||
r5198 | text_form_db = ( | |||
ChangesetComment.query() | ||||
.filter(ChangesetComment.comment_id == comment_id) | ||||
.first() | ||||
.text | ||||
) | ||||
r4563 | assert test_text == text_form_db | |||
r4401 | ||||
r5087 | def test_comment_and_comment_edit_special(self, pr_util, csrf_token, xhr_header): | |||
r4401 | pull_request = pr_util.create_pull_request() | |||
r4410 | target_scm = pull_request.target_repo.scm_instance() | |||
target_scm_name = target_scm.name | ||||
r4401 | response = self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_create", | |||
r4410 | repo_name=target_scm_name, | |||
r5198 | pull_request_id=pull_request.pull_request_id, | |||
), | ||||
r4401 | params={ | |||
r5198 | "csrf_token": csrf_token, | |||
"text": "init", | ||||
r4401 | }, | |||
extra_environ=xhr_header, | ||||
) | ||||
assert response.json | ||||
r4563 | for comment_id in response.json.keys(): | |||
r5198 | test_text = "init" | |||
r4563 | response = self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_edit", | |||
r4563 | repo_name=target_scm_name, | |||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=comment_id, | ||||
), | ||||
extra_environ=xhr_header, | ||||
params={ | ||||
r5198 | "csrf_token": csrf_token, | |||
"text": test_text, | ||||
"version": "0", | ||||
r4563 | }, | |||
status=404, | ||||
) | ||||
assert response.status_int == 404 | ||||
r4401 | ||||
r4408 | def test_comment_and_try_edit_already_edited(self, pr_util, csrf_token, xhr_header): | |||
r4401 | pull_request = pr_util.create_pull_request() | |||
r4410 | target_scm = pull_request.target_repo.scm_instance() | |||
target_scm_name = target_scm.name | ||||
r4401 | response = self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_create", | |||
r4410 | repo_name=target_scm_name, | |||
r5198 | pull_request_id=pull_request.pull_request_id, | |||
), | ||||
r4401 | params={ | |||
r5198 | "csrf_token": csrf_token, | |||
"text": "init", | ||||
r4401 | }, | |||
extra_environ=xhr_header, | ||||
) | ||||
assert response.json | ||||
r4563 | for comment_id in response.json.keys(): | |||
r5198 | test_text = "test" | |||
r4563 | self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_edit", | |||
r4563 | repo_name=target_scm_name, | |||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=comment_id, | ||||
), | ||||
extra_environ=xhr_header, | ||||
params={ | ||||
r5198 | "csrf_token": csrf_token, | |||
"text": test_text, | ||||
"version": "0", | ||||
r4563 | }, | |||
) | ||||
r5198 | test_text_v2 = "test_v2" | |||
r4563 | response = self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_edit", | |||
r4563 | repo_name=target_scm_name, | |||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=comment_id, | ||||
), | ||||
extra_environ=xhr_header, | ||||
params={ | ||||
r5198 | "csrf_token": csrf_token, | |||
"text": test_text_v2, | ||||
"version": "0", | ||||
r4563 | }, | |||
status=409, | ||||
) | ||||
assert response.status_int == 409 | ||||
r4401 | ||||
r5198 | text_form_db = ( | |||
ChangesetComment.query() | ||||
.filter(ChangesetComment.comment_id == comment_id) | ||||
.first() | ||||
.text | ||||
) | ||||
r4401 | ||||
r4563 | assert test_text == text_form_db | |||
assert test_text_v2 != text_form_db | ||||
r4401 | ||||
def test_comment_and_comment_edit_permissions_forbidden( | ||||
r5198 | self, | |||
autologin_regular_user, | ||||
user_regular, | ||||
user_admin, | ||||
pr_util, | ||||
csrf_token, | ||||
xhr_header, | ||||
): | ||||
r4401 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_admin.username, enable_notifications=False | |||
) | ||||
r4401 | comment = CommentsModel().create( | |||
r5198 | text="test", | |||
r4401 | repo=pull_request.target_repo.scm_instance().name, | |||
user=user_admin, | ||||
pull_request=pull_request, | ||||
) | ||||
response = self.app.post( | ||||
route_path( | ||||
r5198 | "pullrequest_comment_edit", | |||
r4401 | repo_name=pull_request.target_repo.scm_instance().name, | |||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=comment.comment_id, | ||||
), | ||||
extra_environ=xhr_header, | ||||
params={ | ||||
r5198 | "csrf_token": csrf_token, | |||
"text": "test_text", | ||||
r4401 | }, | |||
status=403, | ||||
) | ||||
assert response.status_int == 403 | ||||
r1974 | def test_create_pull_request(self, backend, csrf_token): | |||
commits = [ | ||||
r5198 | {"message": "ancestor"}, | |||
{"message": "change"}, | ||||
{"message": "change2"}, | ||||
r1974 | ] | |||
commit_ids = backend.create_master_repo(commits) | ||||
r5198 | target = backend.create_repo(heads=["ancestor"]) | |||
source = backend.create_repo(heads=["change2"]) | ||||
r1974 | ||||
response = self.app.post( | ||||
r5198 | route_path("pullrequest_create", repo_name=source.repo_name), | |||
r1974 | [ | |||
r5198 | ("source_repo", source.repo_name), | |||
("source_ref", "branch:default:" + commit_ids["change2"]), | ||||
("target_repo", target.repo_name), | ||||
("target_ref", "branch:default:" + commit_ids["ancestor"]), | ||||
("common_ancestor", commit_ids["ancestor"]), | ||||
("pullrequest_title", "Title"), | ||||
("pullrequest_desc", "Description"), | ||||
("description_renderer", "markdown"), | ||||
("__start__", "review_members:sequence"), | ||||
("__start__", "reviewer:mapping"), | ||||
("user_id", "1"), | ||||
("__start__", "reasons:sequence"), | ||||
("reason", "Some reason"), | ||||
("__end__", "reasons:sequence"), | ||||
("__start__", "rules:sequence"), | ||||
("__end__", "rules:sequence"), | ||||
("mandatory", "False"), | ||||
("__end__", "reviewer:mapping"), | ||||
("__end__", "review_members:sequence"), | ||||
("__start__", "revisions:sequence"), | ||||
("revisions", commit_ids["change"]), | ||||
("revisions", commit_ids["change2"]), | ||||
("__end__", "revisions:sequence"), | ||||
("user", ""), | ||||
("csrf_token", csrf_token), | ||||
r1974 | ], | |||
r5198 | status=302, | |||
) | ||||
r1974 | ||||
r5198 | location = response.headers["Location"] | |||
pull_request_id = location.rsplit("/", 1)[1] | ||||
assert pull_request_id != "new" | ||||
r1974 | pull_request = PullRequest.get(int(pull_request_id)) | |||
# check that we have now both revisions | ||||
r5198 | assert pull_request.revisions == [commit_ids["change2"], commit_ids["change"]] | |||
assert pull_request.source_ref == "branch:default:" + commit_ids["change2"] | ||||
expected_target_ref = "branch:default:" + commit_ids["ancestor"] | ||||
r1974 | assert pull_request.target_ref == expected_target_ref | |||
def test_reviewer_notifications(self, backend, csrf_token): | ||||
r5198 | # We have to use the app.post for this test, so it will create the | |||
r1974 | # notifications properly with the new PR | |||
commits = [ | ||||
r5198 | { | |||
"message": "ancestor", | ||||
"added": [FileNode(b"file_A", content=b"content_of_ancestor")], | ||||
}, | ||||
{ | ||||
"message": "change", | ||||
"added": [FileNode(b"file_a", content=b"content_of_change")], | ||||
}, | ||||
{"message": "change-child"}, | ||||
{ | ||||
"message": "ancestor-child", | ||||
"parents": ["ancestor"], | ||||
"branch": "feature", | ||||
"added": [FileNode(b"file_c", content=b"content_of_ancestor_child")], | ||||
}, | ||||
{"message": "ancestor-child-2", "branch": "feature"}, | ||||
r1974 | ] | |||
commit_ids = backend.create_master_repo(commits) | ||||
r5198 | target = backend.create_repo(heads=["ancestor-child"]) | |||
source = backend.create_repo(heads=["change"]) | ||||
r1974 | ||||
response = self.app.post( | ||||
r5198 | route_path("pullrequest_create", repo_name=source.repo_name), | |||
r1974 | [ | |||
r5198 | ("source_repo", source.repo_name), | |||
("source_ref", "branch:default:" + commit_ids["change"]), | ||||
("target_repo", target.repo_name), | ||||
("target_ref", "branch:default:" + commit_ids["ancestor-child"]), | ||||
("common_ancestor", commit_ids["ancestor"]), | ||||
("pullrequest_title", "Title"), | ||||
("pullrequest_desc", "Description"), | ||||
("description_renderer", "markdown"), | ||||
("__start__", "review_members:sequence"), | ||||
("__start__", "reviewer:mapping"), | ||||
("user_id", "2"), | ||||
("__start__", "reasons:sequence"), | ||||
("reason", "Some reason"), | ||||
("__end__", "reasons:sequence"), | ||||
("__start__", "rules:sequence"), | ||||
("__end__", "rules:sequence"), | ||||
("mandatory", "False"), | ||||
("__end__", "reviewer:mapping"), | ||||
("__end__", "review_members:sequence"), | ||||
("__start__", "revisions:sequence"), | ||||
("revisions", commit_ids["change"]), | ||||
("__end__", "revisions:sequence"), | ||||
("user", ""), | ||||
("csrf_token", csrf_token), | ||||
r1974 | ], | |||
r5198 | status=302, | |||
) | ||||
r1974 | ||||
r5198 | location = response.headers["Location"] | |||
r1974 | ||||
r5198 | pull_request_id = location.rsplit("/", 1)[1] | |||
assert pull_request_id != "new" | ||||
r1974 | pull_request = PullRequest.get(int(pull_request_id)) | |||
# Check that a notification was made | ||||
r5198 | notifications = Notification.query().filter( | |||
Notification.created_by == pull_request.author.user_id, | ||||
Notification.type_ == Notification.TYPE_PULL_REQUEST, | ||||
Notification.subject.contains( | ||||
"requested a pull request review. !%s" % pull_request_id | ||||
), | ||||
) | ||||
r1974 | assert len(notifications.all()) == 1 | |||
# Change reviewers and check that a notification was made | ||||
PullRequestModel().update_reviewers( | ||||
r5198 | pull_request.pull_request_id, | |||
[(1, [], False, "reviewer", [])], | ||||
pull_request.author, | ||||
) | ||||
r1974 | assert len(notifications.all()) == 2 | |||
r4519 | def test_create_pull_request_stores_ancestor_commit_id(self, backend, csrf_token): | |||
r1974 | commits = [ | |||
r5198 | { | |||
"message": "ancestor", | ||||
"added": [FileNode(b"file_A", content=b"content_of_ancestor")], | ||||
}, | ||||
{ | ||||
"message": "change", | ||||
"added": [FileNode(b"file_a", content=b"content_of_change")], | ||||
}, | ||||
{ | ||||
"message": "change-child", | ||||
"added": [FileNode(b"file_c", content=b"content_of_change_2")], | ||||
}, | ||||
{ | ||||
"message": "ancestor-child", | ||||
"parents": ["ancestor"], | ||||
"branch": "feature", | ||||
"added": [FileNode(b"file_B", content=b"content_of_ancestor_child")], | ||||
}, | ||||
{"message": "ancestor-child-2", "branch": "feature"}, | ||||
r1974 | ] | |||
commit_ids = backend.create_master_repo(commits) | ||||
r5198 | target = backend.create_repo(heads=["ancestor-child"]) | |||
source = backend.create_repo(heads=["change"]) | ||||
r1974 | ||||
response = self.app.post( | ||||
r5198 | route_path("pullrequest_create", repo_name=source.repo_name), | |||
r1974 | [ | |||
r5198 | ("source_repo", source.repo_name), | |||
("source_ref", "branch:default:" + commit_ids["change"]), | ||||
("target_repo", target.repo_name), | ||||
("target_ref", "branch:default:" + commit_ids["ancestor-child"]), | ||||
("common_ancestor", commit_ids["ancestor"]), | ||||
("pullrequest_title", "Title"), | ||||
("pullrequest_desc", "Description"), | ||||
("description_renderer", "markdown"), | ||||
("__start__", "review_members:sequence"), | ||||
("__start__", "reviewer:mapping"), | ||||
("user_id", "1"), | ||||
("__start__", "reasons:sequence"), | ||||
("reason", "Some reason"), | ||||
("__end__", "reasons:sequence"), | ||||
("__start__", "rules:sequence"), | ||||
("__end__", "rules:sequence"), | ||||
("mandatory", "False"), | ||||
("__end__", "reviewer:mapping"), | ||||
("__end__", "review_members:sequence"), | ||||
("__start__", "revisions:sequence"), | ||||
("revisions", commit_ids["change"]), | ||||
("__end__", "revisions:sequence"), | ||||
("user", ""), | ||||
("csrf_token", csrf_token), | ||||
r1974 | ], | |||
r5198 | status=302, | |||
) | ||||
r1974 | ||||
r5198 | location = response.headers["Location"] | |||
r1974 | ||||
r5198 | pull_request_id = location.rsplit("/", 1)[1] | |||
assert pull_request_id != "new" | ||||
r1974 | pull_request = PullRequest.get(int(pull_request_id)) | |||
# target_ref has to point to the ancestor's commit_id in order to | ||||
# show the correct diff | ||||
r5198 | expected_target_ref = "branch:default:" + commit_ids["ancestor"] | |||
r1974 | assert pull_request.target_ref == expected_target_ref | |||
# Check generated diff contents | ||||
response = response.follow() | ||||
r5198 | response.mustcontain(no=["content_of_ancestor"]) | |||
response.mustcontain(no=["content_of_ancestor-child"]) | ||||
response.mustcontain("content_of_change") | ||||
r1974 | ||||
r5607 | def test_merge_pull_request_enabled(self, pr_util, csrf_token, rcextensions_modification): | |||
r1974 | ||||
r5198 | pull_request = pr_util.create_pull_request(approved=True, mergeable=True) | |||
r1974 | pull_request_id = pull_request.pull_request_id | |||
r5607 | repo_name = pull_request.target_repo.scm_instance().name | |||
r1974 | ||||
r5198 | url = route_path( | |||
"pullrequest_merge", | ||||
r5607 | repo_name=repo_name, | |||
r5198 | pull_request_id=pull_request_id, | |||
) | ||||
r5607 | ||||
rcstack_location = os.path.dirname(self.app._pyramid_registry.settings['__file__']) | ||||
rc_ext_location = os.path.join(rcstack_location, 'rcextension-output.txt') | ||||
mods = [ | ||||
('_push_hook', | ||||
f""" | ||||
import os | ||||
action = kwargs['action'] | ||||
commit_ids = kwargs['commit_ids'] | ||||
with open('{rc_ext_location}', 'w') as f: | ||||
f.write('test-execution'+os.linesep) | ||||
f.write(f'{{action}}'+os.linesep) | ||||
f.write(f'{{commit_ids}}'+os.linesep) | ||||
return HookResponse(0, 'HOOK_TEST') | ||||
""") | ||||
] | ||||
# Add the hook | ||||
with rcextensions_modification(rcstack_location, mods, create_if_missing=True, force_create=True): | ||||
response = self.app.post(url, params={"csrf_token": csrf_token}).follow() | ||||
r1974 | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
assert response.status_int == 200 | ||||
assert pull_request.is_closed() | ||||
r5198 | assert_pull_request_status(pull_request, ChangesetStatus.STATUS_APPROVED) | |||
r1974 | ||||
# Check the relevant log entries were added | ||||
r3949 | user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(3) | |||
r1974 | actions = [log.action for log in user_logs] | |||
pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request) | ||||
expected_actions = [ | ||||
r5198 | "repo.pull_request.close", | |||
"repo.pull_request.merge", | ||||
"repo.pull_request.comment.create", | ||||
r1974 | ] | |||
assert actions == expected_actions | ||||
r3949 | user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(4) | |||
r1974 | actions = [log for log in user_logs] | |||
r5198 | assert actions[-1].action == "user.push" | |||
assert actions[-1].action_data["commit_ids"] == pr_commit_ids | ||||
r1974 | ||||
r5607 | with open(rc_ext_location) as f: | |||
f_data = f.read() | ||||
assert 'test-execution' in f_data | ||||
for commit_id in pr_commit_ids: | ||||
assert f'{commit_id}' in f_data | ||||
def test_merge_pull_request_forbidden_by_pre_push_hook(self, pr_util, csrf_token, rcextensions_modification, caplog): | ||||
caplog.set_level(logging.WARNING, logger="rhodecode.model.pull_request") | ||||
pull_request = pr_util.create_pull_request(approved=True, mergeable=True) | ||||
pull_request_id = pull_request.pull_request_id | ||||
repo_name = pull_request.target_repo.scm_instance().name | ||||
url = route_path( | ||||
"pullrequest_merge", | ||||
repo_name=repo_name, | ||||
pull_request_id=pull_request_id, | ||||
) | ||||
rcstack_location = os.path.dirname(self.app._pyramid_registry.settings['__file__']) | ||||
mods = [ | ||||
('_pre_push_hook', | ||||
f""" | ||||
return HookResponse(1, 'HOOK_TEST_FORBIDDEN') | ||||
""") | ||||
] | ||||
# Add the hook | ||||
with rcextensions_modification(rcstack_location, mods, create_if_missing=True, force_create=True): | ||||
self.app.post(url, params={"csrf_token": csrf_token}) | ||||
assert 'Merge failed, not updating the pull request.' in [r[2] for r in caplog.record_tuples] | ||||
r1974 | ||||
def test_merge_pull_request_disabled(self, pr_util, csrf_token): | ||||
pull_request = pr_util.create_pull_request(mergeable=False) | ||||
pull_request_id = pull_request.pull_request_id | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_merge", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
), | ||||
params={"csrf_token": csrf_token}, | ||||
).follow() | ||||
r1974 | ||||
assert response.status_int == 200 | ||||
response.mustcontain( | ||||
r5198 | "Merge is not currently possible because of below failed checks." | |||
) | ||||
response.mustcontain("Server-side pull request merging is disabled.") | ||||
r1974 | ||||
r5198 | @pytest.mark.skip_backends("svn") | |||
r1974 | def test_merge_pull_request_not_approved(self, pr_util, csrf_token): | |||
pull_request = pr_util.create_pull_request(mergeable=True) | ||||
pull_request_id = pull_request.pull_request_id | ||||
repo_name = pull_request.target_repo.scm_instance().name | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_merge", | ||||
repo_name=repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"csrf_token": csrf_token}, | ||||
).follow() | ||||
r1974 | ||||
assert response.status_int == 200 | ||||
response.mustcontain( | ||||
r5198 | "Merge is not currently possible because of below failed checks." | |||
) | ||||
response.mustcontain("Pull request reviewer approval is pending.") | ||||
r1974 | ||||
def test_merge_pull_request_renders_failure_reason( | ||||
r5198 | self, user_regular, csrf_token, pr_util | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request(mergeable=True, approved=True) | |||
pull_request_id = pull_request.pull_request_id | ||||
repo_name = pull_request.target_repo.scm_instance().name | ||||
r5198 | merge_resp = MergeResponse( | |||
True, | ||||
False, | ||||
Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"), | ||||
MergeFailureReason.PUSH_FAILED, | ||||
metadata={"target": "shadow repo", "merge_commit": "xxx"}, | ||||
) | ||||
r1974 | model_patcher = mock.patch.multiple( | |||
PullRequestModel, | ||||
r3339 | merge_repo=mock.Mock(return_value=merge_resp), | |||
r5198 | merge_status=mock.Mock(return_value=(None, True, "WRONG_MESSAGE")), | |||
) | ||||
r1974 | ||||
with model_patcher: | ||||
response = self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_merge", | ||||
repo_name=repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"csrf_token": csrf_token}, | ||||
status=302, | ||||
) | ||||
r1974 | ||||
r5198 | merge_resp = MergeResponse( | |||
True, | ||||
True, | ||||
Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"), | ||||
MergeFailureReason.PUSH_FAILED, | ||||
metadata={"target": "shadow repo", "merge_commit": "xxx"}, | ||||
) | ||||
r3339 | assert_session_flash(response, merge_resp.merge_status_message) | |||
r1974 | ||||
def test_update_source_revision(self, backend, csrf_token): | ||||
commits = [ | ||||
r5198 | {"message": "ancestor"}, | |||
{"message": "change"}, | ||||
{"message": "change-2"}, | ||||
r1974 | ] | |||
commit_ids = backend.create_master_repo(commits) | ||||
r5198 | target = backend.create_repo(heads=["ancestor"]) | |||
source = backend.create_repo(heads=["change"]) | ||||
r1974 | ||||
# create pr from a in source to A in target | ||||
pull_request = PullRequest() | ||||
r3371 | ||||
r1974 | pull_request.source_repo = source | |||
r5198 | pull_request.source_ref = "branch:{branch}:{commit_id}".format( | |||
branch=backend.default_branch_name, commit_id=commit_ids["change"] | ||||
) | ||||
r3371 | ||||
r1974 | pull_request.target_repo = target | |||
r5198 | pull_request.target_ref = "branch:{branch}:{commit_id}".format( | |||
branch=backend.default_branch_name, commit_id=commit_ids["ancestor"] | ||||
) | ||||
r3371 | ||||
r5198 | pull_request.revisions = [commit_ids["change"]] | |||
r5087 | pull_request.title = "Test" | |||
pull_request.description = "Description" | ||||
r3371 | pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) | |||
pull_request.pull_request_state = PullRequest.STATE_CREATED | ||||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
pull_request_id = pull_request.pull_request_id | ||||
# source has ancestor - change - change-2 | ||||
r5198 | backend.pull_heads(source, heads=["change-2"]) | |||
r4486 | target_repo_name = target.repo_name | |||
r1974 | ||||
# update PR | ||||
self.app.post( | ||||
r5198 | route_path( | |||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"update_commits": "true", "csrf_token": csrf_token}, | ||||
) | ||||
r3371 | ||||
response = self.app.get( | ||||
r5198 | route_path( | |||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r3371 | ||||
assert response.status_int == 200 | ||||
r5198 | response.mustcontain("Pull request updated to") | |||
response.mustcontain("with 1 added, 0 removed commits.") | ||||
r1974 | ||||
# check that we have now both revisions | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r5198 | assert pull_request.revisions == [commit_ids["change-2"], commit_ids["change"]] | |||
r1974 | ||||
def test_update_target_revision(self, backend, csrf_token): | ||||
r5198 | """ | |||
Checks when we add more commits into a target branch, and update PR | ||||
""" | ||||
r1974 | commits = [ | |||
r5198 | {"message": "commit-a"}, # main branch (our PR target) | |||
{"message": "commit-b"}, # Initial source | ||||
{"message": "commit-c"}, | ||||
{"message": "commit-a-prime", "branch": "feature", "parents": ["commit-a"]}, # main branch (source) | ||||
r1974 | ] | |||
r5198 | ||||
r1974 | commit_ids = backend.create_master_repo(commits) | |||
r5198 | target = backend.create_repo(heads=["commit-a"]) | |||
source = backend.create_repo(heads=["commit-b"]) | ||||
target_repo_name = target.repo_name | ||||
r1974 | ||||
r5198 | # create pr from commit-b to commit-a | |||
r1974 | pull_request = PullRequest() | |||
r3371 | ||||
r5198 | pull_request.target_repo = target | |||
pull_request.target_ref = "branch:{branch}:{commit_id}".format( | ||||
branch=backend.default_branch_name, commit_id=commit_ids["commit-a"] | ||||
) | ||||
r3371 | ||||
r5198 | pull_request.source_repo = source | |||
pull_request.source_ref = "branch:{branch}:{commit_id}".format( | ||||
branch=backend.default_branch_name, commit_id=commit_ids["commit-b"] | ||||
) | ||||
r3371 | ||||
r5198 | pull_request.revisions = [commit_ids["commit-b"]] | |||
r5087 | pull_request.title = "Test" | |||
pull_request.description = "Description" | ||||
r3371 | pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) | |||
pull_request.pull_request_state = PullRequest.STATE_CREATED | ||||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
pull_request_id = pull_request.pull_request_id | ||||
r5198 | # target - add one commit on top commit-a -> commit-b | |||
backend.pull_heads(target, heads=["commit-b"]) | ||||
r1974 | # source has ancestor - ancestor-new - change-rebased | |||
r5198 | backend.pull_heads(source, heads=["commit-c"]) | |||
r1974 | ||||
# update PR | ||||
r5198 | url = route_path( | |||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
) | ||||
self.app.post( | ||||
url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200 | ||||
) | ||||
r1974 | ||||
# check that we have now both revisions | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r5198 | assert pull_request.revisions == [commit_ids["commit-c"]] | |||
assert pull_request.target_ref == "branch:{branch}:{commit_id}".format( | ||||
branch=backend.default_branch_name, commit_id=commit_ids["commit-b"] | ||||
) | ||||
r1974 | ||||
r3371 | response = self.app.get( | |||
r5198 | route_path( | |||
"pullrequest_show", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | assert response.status_int == 200 | |||
r5198 | response.mustcontain("Pull request updated to") | |||
response.mustcontain("with 1 added, 1 removed commits.") | ||||
r1974 | ||||
r5198 | def test_update_target_revision_with_removal_of_1_commit_git( | |||
self, backend_git, csrf_token | ||||
): | ||||
r2784 | backend = backend_git | |||
commits = [ | ||||
r5198 | {"message": "master-commit-1"}, | |||
{"message": "master-commit-2-change-1"}, | ||||
{"message": "master-commit-3-change-2"}, | ||||
{ | ||||
"message": "feat-commit-1", | ||||
"parents": ["master-commit-1"], | ||||
"branch": "feature", | ||||
}, | ||||
{"message": "feat-commit-2", "branch": "feature"}, | ||||
r2784 | ] | |||
commit_ids = backend.create_master_repo(commits) | ||||
r5198 | target = backend.create_repo(heads=["master-commit-3-change-2"]) | |||
source = backend.create_repo(heads=["feat-commit-2"]) | ||||
r2784 | ||||
# create pr from a in source to A in target | ||||
pull_request = PullRequest() | ||||
pull_request.source_repo = source | ||||
r3371 | ||||
r5198 | pull_request.source_ref = "branch:{branch}:{commit_id}".format( | |||
r2784 | branch=backend.default_branch_name, | |||
r5198 | commit_id=commit_ids["master-commit-3-change-2"], | |||
) | ||||
r2784 | ||||
pull_request.target_repo = target | ||||
r5198 | pull_request.target_ref = "branch:{branch}:{commit_id}".format( | |||
branch=backend.default_branch_name, commit_id=commit_ids["feat-commit-2"] | ||||
) | ||||
r2784 | ||||
pull_request.revisions = [ | ||||
r5198 | commit_ids["feat-commit-1"], | |||
commit_ids["feat-commit-2"], | ||||
r2784 | ] | |||
r5087 | pull_request.title = "Test" | |||
pull_request.description = "Description" | ||||
r3371 | pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) | |||
pull_request.pull_request_state = PullRequest.STATE_CREATED | ||||
r2784 | Session().add(pull_request) | |||
Session().commit() | ||||
pull_request_id = pull_request.pull_request_id | ||||
# PR is created, now we simulate a force-push into target, | ||||
# that drops a 2 last commits | ||||
vcsrepo = target.scm_instance() | ||||
r5198 | vcsrepo.config.clear_section("hooks") | |||
vcsrepo.run_git_command(["reset", "--soft", "HEAD~2"]) | ||||
r4486 | target_repo_name = target.repo_name | |||
r2784 | ||||
# update PR | ||||
r5198 | url = route_path( | |||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
) | ||||
self.app.post( | ||||
url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200 | ||||
) | ||||
r2784 | ||||
r5198 | response = self.app.get( | |||
route_path("pullrequest_new", repo_name=target_repo_name) | ||||
) | ||||
r2784 | assert response.status_int == 200 | |||
r5198 | response.mustcontain("Pull request updated to") | |||
response.mustcontain("with 0 added, 0 removed commits.") | ||||
r2784 | ||||
r5226 | @pytest.mark.xfail(reason="unable to fix after pygit update") | |||
r5198 | def test_update_pr_ancestor_reference(self, csrf_token, pr_util: PRTestUtility): | |||
r1974 | commits = [ | |||
r5198 | {"message": "ancestor"}, | |||
{"message": "change"}, | ||||
{"message": "change-2"}, | ||||
{"message": "ancestor-new", "parents": ["ancestor"], "branch": "feature"}, | ||||
{"message": "change-rebased", "branch": "feature"}, | ||||
r1974 | ] | |||
r5198 | pull_request = pr_util.create_pull_request( | |||
commits, | ||||
target_head="ancestor", | ||||
source_head="change", | ||||
revisions=["change"], | ||||
) | ||||
pull_request_id = pull_request.pull_request_id | ||||
target_repo_name = pr_util.target_repository.repo_name | ||||
commit_ids = pr_util.commit_ids | ||||
assert pull_request.revisions == [commit_ids["change"]] | ||||
assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name] | ||||
assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name] | ||||
r3371 | ||||
r5198 | branch = "feature" | |||
pr_util.update_target_repository(head="ancestor-new", do_fetch=True) | ||||
pr_util.set_pr_target_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["ancestor-new"]) | ||||
pr_util.update_source_repository(head="change-rebased", do_fetch=True) | ||||
pr_util.set_pr_source_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["change-rebased"]) | ||||
assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch] | ||||
assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch] | ||||
Session().add(pr_util.pull_request) | ||||
r1974 | Session().commit() | |||
r5198 | self.app.post( | |||
route_path( | ||||
"pullrequest_update", | ||||
repo_name=target_repo_name, | ||||
pull_request_id=pull_request_id, | ||||
), | ||||
params={"update_commits": "true", "csrf_token": csrf_token, "force_refresh": True}, | ||||
status=200, | ||||
) | ||||
r1974 | ||||
r5198 | # response = self.app.get( | |||
# route_path( | ||||
# "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id, | ||||
# params={"force_refresh": True} | ||||
# ), | ||||
# ) | ||||
# | ||||
# response.mustcontain("Pull request updated to") | ||||
# response.mustcontain("with 1 added, 0 removed commits.") | ||||
r1974 | ||||
pull_request = PullRequest.get(pull_request_id) | ||||
r5198 | ||||
assert pull_request.target_ref == "branch:{branch}:{commit_id}".format( | ||||
branch="feature", commit_id=commit_ids["ancestor-new"]) | ||||
assert pull_request.revisions == [commit_ids["change-rebased"]] | ||||
r1974 | def test_remove_pull_request_branch(self, backend_git, csrf_token): | |||
r5198 | branch_name = "development" | |||
r1974 | commits = [ | |||
r5198 | {"message": "initial-commit"}, | |||
{"message": "old-feature"}, | ||||
{"message": "new-feature", "branch": branch_name}, | ||||
r1974 | ] | |||
repo = backend_git.create_repo(commits) | ||||
r3840 | repo_name = repo.repo_name | |||
r1974 | commit_ids = backend_git.commit_ids | |||
pull_request = PullRequest() | ||||
pull_request.source_repo = repo | ||||
pull_request.target_repo = repo | ||||
r5198 | pull_request.source_ref = "branch:{branch}:{commit_id}".format( | |||
branch=branch_name, commit_id=commit_ids["new-feature"] | ||||
) | ||||
pull_request.target_ref = "branch:{branch}:{commit_id}".format( | ||||
branch=backend_git.default_branch_name, commit_id=commit_ids["old-feature"] | ||||
) | ||||
pull_request.revisions = [commit_ids["new-feature"]] | ||||
r5087 | pull_request.title = "Test" | |||
pull_request.description = "Description" | ||||
r3371 | pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) | |||
pull_request.pull_request_state = PullRequest.STATE_CREATED | ||||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
r3840 | pull_request_id = pull_request.pull_request_id | |||
r1974 | vcs = repo.scm_instance() | |||
r5198 | vcs.remove_ref("refs/heads/{}".format(branch_name)) | |||
r4299 | # NOTE(marcink): run GC to ensure the commits are gone | |||
vcs.run_gc() | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", repo_name=repo_name, pull_request_id=pull_request_id | ||||
) | ||||
) | ||||
r1974 | ||||
assert response.status_int == 200 | ||||
r3339 | ||||
response.assert_response().element_contains( | ||||
r5198 | "#changeset_compare_view_content .alert strong", "Missing commits" | |||
) | ||||
r3339 | response.assert_response().element_contains( | |||
r5198 | "#changeset_compare_view_content .alert", | |||
"This pull request cannot be displayed, because one or more" | ||||
" commits no longer exist in the source repository.", | ||||
) | ||||
r1974 | ||||
r5198 | def test_strip_commits_from_pull_request(self, backend, pr_util): | |||
r1974 | commits = [ | |||
r5198 | {"message": "initial-commit"}, | |||
{"message": "old-feature"}, | ||||
{"message": "new-feature"}, | ||||
r1974 | ] | |||
pull_request = pr_util.create_pull_request( | ||||
r5198 | commits, | |||
target_head="initial-commit", | ||||
source_head="new-feature", | ||||
revisions=["new-feature"], | ||||
) | ||||
r1974 | ||||
vcs = pr_util.source_repository.scm_instance() | ||||
r5198 | if backend.alias == "git": | |||
vcs.strip(pr_util.commit_ids["new-feature"], branch_name=pr_util.backend.default_branch_name) | ||||
r1974 | else: | |||
r5198 | vcs.strip(pr_util.commit_ids["new-feature"]) | |||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pr_util.target_repository.repo_name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
assert response.status_int == 200 | ||||
r3339 | ||||
response.assert_response().element_contains( | ||||
r5198 | "#changeset_compare_view_content .alert strong", "Missing commits" | |||
) | ||||
r3339 | response.assert_response().element_contains( | |||
r5198 | "#changeset_compare_view_content .alert", | |||
"This pull request cannot be displayed, because one or more" | ||||
" commits no longer exist in the source repository.", | ||||
) | ||||
response.assert_response().element_contains("#update_commits", "Update commits") | ||||
r1974 | ||||
r5198 | def test_strip_commits_and_update(self, backend, pr_util, csrf_token): | |||
r1974 | commits = [ | |||
r5198 | {"message": "initial-commit"}, | |||
{"message": "old-feature"}, | ||||
{"message": "new-feature", "parents": ["old-feature"]}, | ||||
r1974 | ] | |||
pull_request = pr_util.create_pull_request( | ||||
r5198 | commits, | |||
target_head="old-feature", | ||||
source_head="new-feature", | ||||
revisions=["new-feature"], | ||||
mergeable=True, | ||||
) | ||||
r4486 | pr_id = pull_request.pull_request_id | |||
target_repo_name = pull_request.target_repo.repo_name | ||||
r1974 | ||||
vcs = pr_util.source_repository.scm_instance() | ||||
r5198 | if backend.alias == "git": | |||
vcs.strip(pr_util.commit_ids["new-feature"], branch_name="master") | ||||
r1974 | else: | |||
r5198 | vcs.strip(pr_util.commit_ids["new-feature"]) | |||
r1974 | ||||
r5198 | url = route_path( | |||
"pullrequest_update", repo_name=target_repo_name, pull_request_id=pr_id | ||||
) | ||||
response = self.app.post( | ||||
url, params={"update_commits": "true", "csrf_token": csrf_token} | ||||
) | ||||
r1974 | ||||
assert response.status_int == 200 | ||||
r5198 | assert json.loads(response.body) == json.loads( | |||
'{"response": true, "redirect_url": null}' | ||||
) | ||||
r1974 | ||||
# Make sure that after update, it won't raise 500 errors | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", repo_name=target_repo_name, pull_request_id=pr_id | ||||
) | ||||
) | ||||
r1974 | ||||
assert response.status_int == 200 | ||||
r3339 | response.assert_response().element_contains( | |||
r5198 | "#changeset_compare_view_content .alert strong", "Missing commits" | |||
) | ||||
r1974 | ||||
def test_branch_is_a_link(self, pr_util): | ||||
pull_request = pr_util.create_pull_request() | ||||
r5198 | pull_request.source_ref = "branch:origin:1234567890abcdef" | |||
pull_request.target_ref = "branch:target:abcdef1234567890" | ||||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | assert response.status_int == 200 | |||
r5198 | source = response.assert_response().get_element(".pr-source-info") | |||
r4136 | source_parent = source.getparent() | |||
assert len(source_parent) == 1 | ||||
r5198 | target = response.assert_response().get_element(".pr-target-info") | |||
r4136 | target_parent = target.getparent() | |||
assert len(target_parent) == 1 | ||||
r1974 | ||||
expected_origin_link = route_path( | ||||
r5198 | "repo_commits", | |||
r1974 | repo_name=pull_request.source_repo.scm_instance().name, | |||
r5198 | params=dict(branch="origin"), | |||
) | ||||
r1974 | expected_target_link = route_path( | |||
r5198 | "repo_commits", | |||
r1974 | repo_name=pull_request.target_repo.scm_instance().name, | |||
r5198 | params=dict(branch="target"), | |||
) | ||||
assert source_parent.attrib["href"] == expected_origin_link | ||||
assert target_parent.attrib["href"] == expected_target_link | ||||
r1974 | ||||
def test_bookmark_is_not_a_link(self, pr_util): | ||||
pull_request = pr_util.create_pull_request() | ||||
r5198 | pull_request.source_ref = "bookmark:origin:1234567890abcdef" | |||
pull_request.target_ref = "bookmark:target:abcdef1234567890" | ||||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | assert response.status_int == 200 | |||
r5198 | source = response.assert_response().get_element(".pr-source-info") | |||
assert source.text.strip() == "bookmark:origin" | ||||
assert source.getparent().attrib.get("href") is None | ||||
r1974 | ||||
r5198 | target = response.assert_response().get_element(".pr-target-info") | |||
assert target.text.strip() == "bookmark:target" | ||||
assert target.getparent().attrib.get("href") is None | ||||
r1974 | ||||
def test_tag_is_not_a_link(self, pr_util): | ||||
pull_request = pr_util.create_pull_request() | ||||
r5198 | pull_request.source_ref = "tag:origin:1234567890abcdef" | |||
pull_request.target_ref = "tag:target:abcdef1234567890" | ||||
r1974 | Session().add(pull_request) | |||
Session().commit() | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | assert response.status_int == 200 | |||
r5198 | source = response.assert_response().get_element(".pr-source-info") | |||
assert source.text.strip() == "tag:origin" | ||||
assert source.getparent().attrib.get("href") is None | ||||
r1974 | ||||
r5198 | target = response.assert_response().get_element(".pr-target-info") | |||
assert target.text.strip() == "tag:target" | ||||
assert target.getparent().attrib.get("href") is None | ||||
r1974 | ||||
r5198 | @pytest.mark.parametrize("mergeable", [True, False]) | |||
def test_shadow_repository_link(self, mergeable, pr_util, http_host_only_stub): | ||||
r1974 | """ | |||
Check that the pull request summary page displays a link to the shadow | ||||
repository if the pull request is mergeable. If it is not mergeable | ||||
the link should not be displayed. | ||||
""" | ||||
pull_request = pr_util.create_pull_request( | ||||
r5198 | mergeable=mergeable, enable_notifications=False | |||
) | ||||
r1974 | target_repo = pull_request.target_repo.scm_instance() | |||
pr_id = pull_request.pull_request_id | ||||
r5198 | shadow_url = "{host}/{repo}/pull-request/{pr_id}/repository".format( | |||
host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id | ||||
) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", repo_name=target_repo.name, pull_request_id=pr_id | ||||
) | ||||
) | ||||
r1974 | ||||
if mergeable: | ||||
r3339 | response.assert_response().element_value_contains( | |||
r5198 | "input.pr-mergeinfo", shadow_url | |||
) | ||||
r3339 | response.assert_response().element_value_contains( | |||
r5198 | "input.pr-mergeinfo ", "pr-merge" | |||
) | ||||
r1974 | else: | |||
r5198 | response.assert_response().no_element_exists(".pr-mergeinfo") | |||
r1974 | ||||
r5198 | @pytest.mark.usefixtures("app") | |||
r1974 | @pytest.mark.backends("git", "hg") | |||
class TestPullrequestsControllerDelete(object): | ||||
def test_pull_request_delete_button_permissions_admin( | ||||
r5198 | self, autologin_user, user_admin, pr_util | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_admin.username, enable_notifications=False | |||
) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
response.mustcontain('id="delete_pullrequest"') | ||||
r5198 | response.mustcontain("Confirm to delete this pull request") | |||
r1974 | ||||
def test_pull_request_delete_button_permissions_owner( | ||||
r5198 | self, autologin_regular_user, user_regular, pr_util | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_regular.username, enable_notifications=False | |||
) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
response.mustcontain('id="delete_pullrequest"') | ||||
r5198 | response.mustcontain("Confirm to delete this pull request") | |||
r1974 | ||||
def test_pull_request_delete_button_permissions_forbidden( | ||||
r5198 | self, autologin_regular_user, user_regular, user_admin, pr_util | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_admin.username, enable_notifications=False | |||
) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | response.mustcontain(no=['id="delete_pullrequest"']) | |||
r5198 | response.mustcontain(no=["Confirm to delete this pull request"]) | |||
r1974 | ||||
def test_pull_request_delete_button_permissions_can_update_cannot_delete( | ||||
r5198 | self, autologin_regular_user, user_regular, user_admin, pr_util, user_util | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_admin.username, enable_notifications=False | |||
) | ||||
r1974 | ||||
user_util.grant_user_permission_to_repo( | ||||
r5198 | pull_request.target_repo, user_regular, "repository.write" | |||
) | ||||
r1974 | ||||
r5198 | response = self.app.get( | |||
route_path( | ||||
"pullrequest_show", | ||||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
) | ||||
) | ||||
r1974 | ||||
response.mustcontain('id="open_edit_pullrequest"') | ||||
response.mustcontain('id="delete_pullrequest"') | ||||
r5198 | response.mustcontain(no=["Confirm to delete this pull request"]) | |||
r1974 | ||||
def test_delete_comment_returns_404_if_comment_does_not_exist( | ||||
r5198 | self, autologin_user, pr_util, user_admin, csrf_token, xhr_header | |||
): | ||||
r1974 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_admin.username, enable_notifications=False | |||
) | ||||
r1974 | ||||
r1978 | self.app.post( | |||
route_path( | ||||
r5198 | "pullrequest_comment_delete", | |||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=1024404, | ||||
), | ||||
r1978 | extra_environ=xhr_header, | |||
r5198 | params={"csrf_token": csrf_token}, | |||
status=404, | ||||
r1978 | ) | |||
def test_delete_comment( | ||||
r5198 | self, autologin_user, pr_util, user_admin, csrf_token, xhr_header | |||
): | ||||
r1978 | pull_request = pr_util.create_pull_request( | |||
r5198 | author=user_admin.username, enable_notifications=False | |||
) | ||||
r1978 | comment = pr_util.create_comment() | |||
comment_id = comment.comment_id | ||||
response = self.app.post( | ||||
route_path( | ||||
r5198 | "pullrequest_comment_delete", | |||
repo_name=pull_request.target_repo.scm_instance().name, | ||||
pull_request_id=pull_request.pull_request_id, | ||||
comment_id=comment_id, | ||||
), | ||||
r1978 | extra_environ=xhr_header, | |||
r5198 | params={"csrf_token": csrf_token}, | |||
status=200, | ||||
r1978 | ) | |||
r5198 | assert response.text == "true" | |||
r1974 | ||||
r5198 | @pytest.mark.parametrize( | |||
"url_type", | ||||
[ | ||||
"pullrequest_new", | ||||
"pullrequest_create", | ||||
"pullrequest_update", | ||||
"pullrequest_merge", | ||||
], | ||||
) | ||||
r3090 | def test_pull_request_is_forbidden_on_archived_repo( | |||
r5198 | self, autologin_user, backend, xhr_header, user_util, url_type | |||
): | ||||
r3090 | # create a temporary repo | |||
source = user_util.create_repo(repo_type=backend.alias) | ||||
repo_name = source.repo_name | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
repo.archived = True | ||||
Session().commit() | ||||
response = self.app.get( | ||||
r5198 | route_path(url_type, repo_name=repo_name, pull_request_id=1), status=302 | |||
) | ||||
r3090 | ||||
r5198 | msg = "Action not supported for archived repository." | |||
r3090 | assert_session_flash(response, msg) | |||
r1974 | ||||
def assert_pull_request_status(pull_request, expected_status): | ||||
r4038 | status = ChangesetStatusModel().calculated_review_status(pull_request=pull_request) | |||
r1974 | assert status == expected_status | |||
r5198 | @pytest.mark.parametrize("route", ["pullrequest_new", "pullrequest_create"]) | |||
r1974 | @pytest.mark.usefixtures("autologin_user") | |||
def test_forbidde_to_repo_summary_for_svn_repositories(backend_svn, app, route): | ||||
r4038 | app.get(route_path(route, repo_name=backend_svn.repo_name), status=404) | |||