##// END OF EJS Templates
fix(caching): fixed problems with Cache query for users....
fix(caching): fixed problems with Cache query for users. The old way of querying caused the user get query to be always cached, and returning old results even in 2fa forms. The new limited query doesn't cache the user object resolving issues

File last commit:

r5226:587a1881 default
r5365:ae8a165b default
Show More
test_repo_pullrequests.py
1937 lines | 71.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import mock
import pytest
import rhodecode
from rhodecode.lib import helpers as h
from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason, Reference
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.lib.ext_json import json
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import (
PullRequest,
ChangesetStatus,
UserLog,
Notification,
ChangesetComment,
Repository,
)
from rhodecode.model.meta import Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.model.comment import CommentsModel
from rhodecode.tests import (
assert_session_flash,
TEST_USER_ADMIN_LOGIN,
TEST_USER_REGULAR_LOGIN,
)
from rhodecode.tests.fixture_mods.fixture_utils import PRTestUtility
from rhodecode.tests.routes import route_path
@pytest.mark.usefixtures("app", "autologin_user")
@pytest.mark.backends("git", "hg")
class TestPullrequestsView(object):
def test_index(self, backend):
self.app.get(route_path("pullrequest_new", repo_name=backend.repo_name))
def test_option_menu_create_pull_request_exists(self, backend):
repo_name = backend.repo_name
response = self.app.get(h.route_path("repo_summary", repo_name=repo_name))
create_pr_link = '<a href="%s">Create Pull Request</a>' % route_path(
"pullrequest_new", repo_name=repo_name
)
response.mustcontain(create_pr_link)
def test_create_pr_form_with_raw_commit_id(self, backend):
repo = backend.repo
self.app.get(
route_path(
"pullrequest_new",
repo_name=repo.repo_name,
commit=repo.get_commit().raw_id,
),
status=200,
)
@pytest.mark.parametrize("pr_merge_enabled", [True, False])
@pytest.mark.parametrize("range_diff", ["0", "1"])
def test_show(self, pr_util, pr_merge_enabled, range_diff):
pull_request = pr_util.create_pull_request(
mergeable=pr_merge_enabled, enable_notifications=False
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
params={"range-diff": range_diff},
)
)
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
response.mustcontain(pull_request.target_ref_parts.type)
response.mustcontain(pull_request.target_ref_parts.name)
response.mustcontain('class="pull-request-merge"')
if pr_merge_enabled:
response.mustcontain("Pull request reviewer approval is pending")
else:
response.mustcontain("Server-side pull request merging is disabled.")
if range_diff == "1":
response.mustcontain("Turn off: Show the diff as commit range")
def test_show_versions_of_pr(self, backend, csrf_token):
commits = [
{
"message": "initial-commit",
"added": [FileNode(b"test-file.txt", b"LINE1\n")],
},
{
"message": "commit-1",
"changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\n")],
},
# Above is the initial version of PR that changes a single line
# from now on we'll add 3x commit adding a nother line on each step
{
"message": "commit-2",
"changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\n")],
},
{
"message": "commit-3",
"changed": [
FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\n")
],
},
{
"message": "commit-4",
"changed": [
FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\nLINE5\n")
],
},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["initial-commit"])
source = backend.create_repo(heads=["commit-1"])
source_repo_name = source.repo_name
target_repo_name = target.repo_name
target_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["initial-commit"]
)
source_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["commit-1"]
)
response = self.app.post(
route_path("pullrequest_create", repo_name=source.repo_name),
[
("source_repo", source_repo_name),
("source_ref", source_ref),
("target_repo", target_repo_name),
("target_ref", target_ref),
("common_ancestor", commit_ids["initial-commit"]),
("pullrequest_title", "Title"),
("pullrequest_desc", "Description"),
("description_renderer", "markdown"),
("__start__", "review_members:sequence"),
("__start__", "reviewer:mapping"),
("user_id", "1"),
("__start__", "reasons:sequence"),
("reason", "Some reason"),
("__end__", "reasons:sequence"),
("__start__", "rules:sequence"),
("__end__", "rules:sequence"),
("mandatory", "False"),
("__end__", "reviewer:mapping"),
("__end__", "review_members:sequence"),
("__start__", "revisions:sequence"),
("revisions", commit_ids["commit-1"]),
("__end__", "revisions:sequence"),
("user", ""),
("csrf_token", csrf_token),
],
status=302,
)
location = response.headers["Location"]
pull_request_id = location.rsplit("/", 1)[1]
assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
pull_request_id = pull_request.pull_request_id
# Show initial version of PR
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
)
)
response.mustcontain("commit-1")
response.mustcontain(no=["commit-2"])
response.mustcontain(no=["commit-3"])
response.mustcontain(no=["commit-4"])
response.mustcontain('cb-addition"></span><span>LINE2</span>')
response.mustcontain(no=["LINE3"])
response.mustcontain(no=["LINE4"])
response.mustcontain(no=["LINE5"])
# update PR #1
source_repo = Repository.get_by_repo_name(source_repo_name)
backend.pull_heads(source_repo, heads=["commit-2"])
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
),
params={"update_commits": "true", "csrf_token": csrf_token},
)
# update PR #2
source_repo = Repository.get_by_repo_name(source_repo_name)
backend.pull_heads(source_repo, heads=["commit-3"])
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
),
params={"update_commits": "true", "csrf_token": csrf_token},
)
# update PR #3
source_repo = Repository.get_by_repo_name(source_repo_name)
backend.pull_heads(source_repo, heads=["commit-4"])
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
),
params={"update_commits": "true", "csrf_token": csrf_token},
)
# Show final version !
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
)
)
# 3 updates, and the latest == 4
response.mustcontain("4 versions available for this pull request")
response.mustcontain(no=["rhodecode diff rendering error"])
# initial show must have 3 commits, and 3 adds
response.mustcontain("commit-1")
response.mustcontain("commit-2")
response.mustcontain("commit-3")
response.mustcontain("commit-4")
response.mustcontain('cb-addition"></span><span>LINE2</span>')
response.mustcontain('cb-addition"></span><span>LINE3</span>')
response.mustcontain('cb-addition"></span><span>LINE4</span>')
response.mustcontain('cb-addition"></span><span>LINE5</span>')
# fetch versions
pr = PullRequest.get(pull_request_id)
versions = [x.pull_request_version_id for x in pr.versions.all()]
assert len(versions) == 3
# show v1,v2,v3,v4
def cb_line(text):
return 'cb-addition"></span><span>{}</span>'.format(text)
def cb_context(text):
return (
'<span class="cb-code"><span class="cb-action cb-context">'
"</span><span>{}</span></span>".format(text)
)
commit_tests = {
# in response, not in response
1: (["commit-1"], ["commit-2", "commit-3", "commit-4"]),
2: (["commit-1", "commit-2"], ["commit-3", "commit-4"]),
3: (["commit-1", "commit-2", "commit-3"], ["commit-4"]),
4: (["commit-1", "commit-2", "commit-3", "commit-4"], []),
}
diff_tests = {
1: (["LINE2"], ["LINE3", "LINE4", "LINE5"]),
2: (["LINE2", "LINE3"], ["LINE4", "LINE5"]),
3: (["LINE2", "LINE3", "LINE4"], ["LINE5"]),
4: (["LINE2", "LINE3", "LINE4", "LINE5"], []),
}
for idx, ver in enumerate(versions, 1):
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
params={"version": ver},
)
)
response.mustcontain(no=["rhodecode diff rendering error"])
response.mustcontain("Showing changes at v{}".format(idx))
yes, no = commit_tests[idx]
for y in yes:
response.mustcontain(y)
for n in no:
response.mustcontain(no=n)
yes, no = diff_tests[idx]
for y in yes:
response.mustcontain(cb_line(y))
for n in no:
response.mustcontain(no=n)
# show diff between versions
diff_compare_tests = {
1: (["LINE3"], ["LINE1", "LINE2"]),
2: (["LINE3", "LINE4"], ["LINE1", "LINE2"]),
3: (["LINE3", "LINE4", "LINE5"], ["LINE1", "LINE2"]),
}
for idx, ver in enumerate(versions, 1):
adds, context = diff_compare_tests[idx]
to_ver = ver + 1
if idx == 3:
to_ver = "latest"
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
params={"from_version": versions[0], "version": to_ver},
)
)
response.mustcontain(no=["rhodecode diff rendering error"])
for a in adds:
response.mustcontain(cb_line(a))
for c in context:
response.mustcontain(cb_context(c))
# test version v2 -> v3
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
params={"from_version": versions[1], "version": versions[2]},
)
)
response.mustcontain(cb_context("LINE1"))
response.mustcontain(cb_context("LINE2"))
response.mustcontain(cb_context("LINE3"))
response.mustcontain(cb_line("LINE4"))
def test_close_status_visibility(self, pr_util, user_util, csrf_token):
# Logout
response = self.app.post(
h.route_path("logout"), params={"csrf_token": csrf_token}
)
# Login as regular user
response = self.app.post(
h.route_path("login"),
{"username": TEST_USER_REGULAR_LOGIN, "password": "test12"},
)
pull_request = pr_util.create_pull_request(author=TEST_USER_REGULAR_LOGIN)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
response.mustcontain("Server-side pull request merging is disabled.")
assert_response = response.assert_response()
# for regular user without a merge permissions, we don't see it
assert_response.no_element_exists("#close-pull-request-action")
user_util.grant_user_permission_to_repo(
pull_request.target_repo,
UserModel().get_by_username(TEST_USER_REGULAR_LOGIN),
"repository.write",
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
response.mustcontain("Server-side pull request merging is disabled.")
assert_response = response.assert_response()
# now regular user has a merge permissions, we have CLOSE button
assert_response.one_element_exists("#close-pull-request-action")
def test_show_invalid_commit_id(self, pr_util):
# Simulating invalid revisions which will cause a lookup error
pull_request = pr_util.create_pull_request()
pull_request.revisions = ["invalid"]
Session().add(pull_request)
Session().commit()
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
def test_show_invalid_source_reference(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = "branch:b:invalid"
Session().add(pull_request)
Session().commit()
self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
def test_edit_title_description(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id,
),
params={
"edit_pull_request": "true",
"title": "New title",
"description": "New description",
"csrf_token": csrf_token,
},
)
assert_session_flash(
response, "Pull request title & description updated.", category="success"
)
pull_request = PullRequest.get(pull_request_id)
assert pull_request.title == "New title"
assert pull_request.description == "New description"
def test_edit_title_description_special(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id,
),
params={
"edit_pull_request": "true",
"title": "New title {} {2} {foo}",
"description": "New description",
"csrf_token": csrf_token,
},
)
assert_session_flash(
response, "Pull request title & description updated.", category="success"
)
pull_request = PullRequest.get(pull_request_id)
assert pull_request.title_safe == "New title {{}} {{2}} {{foo}}"
def test_edit_title_description_closed(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.repo_name
pr_util.close()
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=repo_name,
pull_request_id=pull_request_id,
),
params={
"edit_pull_request": "true",
"title": "New title",
"description": "New description",
"csrf_token": csrf_token,
},
status=200,
)
assert_session_flash(
response, "Cannot update closed pull requests.", category="error"
)
def test_update_invalid_source_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import UpdateFailureReason
pull_request = pr_util.create_pull_request()
pull_request.source_ref = "branch:invalid-branch:invalid-commit-id"
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path(
"pullrequest_update",
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id,
),
params={"update_commits": "true", "csrf_token": csrf_token},
)
expected_msg = str(
PullRequestModel.UPDATE_STATUS_MESSAGES[
UpdateFailureReason.MISSING_SOURCE_REF
]
)
assert_session_flash(response, expected_msg, category="error")
def test_missing_target_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import MergeFailureReason
pull_request = pr_util.create_pull_request(approved=True, mergeable=True)
unicode_reference = "branch:invalid-branch:invalid-commit-id"
pull_request.target_ref = unicode_reference
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
pull_request_url = route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id,
)
response = self.app.get(pull_request_url)
# target_ref_id = "invalid-branch"
merge_resp = MergeResponse(
True,
True,
Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"),
MergeFailureReason.MISSING_TARGET_REF,
metadata={
"target_ref": PullRequest.unicode_to_reference(unicode_reference)
},
)
response.assert_response().element_contains(
'div[data-role="merge-message"]', merge_resp.merge_status_message
)
def test_comment_and_close_pull_request_custom_message_approved(
self, pr_util, csrf_token, xhr_header
):
pull_request = pr_util.create_pull_request(approved=True)
pull_request_id = pull_request.pull_request_id
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request_id,
),
params={
"close_pull_request": "1",
"text": "Closing a PR",
"csrf_token": csrf_token,
},
extra_environ=xhr_header,
)
journal = (
UserLog.query()
.filter(UserLog.user_id == author)
.filter(UserLog.repository_id == repo)
.order_by(UserLog.user_log_id.asc())
.all()
)
assert journal[-1].action == "repo.pull_request.close"
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request
)
assert status == ChangesetStatus.STATUS_APPROVED
comments = (
ChangesetComment()
.query()
.filter(ChangesetComment.pull_request == pull_request)
.order_by(ChangesetComment.comment_id.asc())
.all()
)
assert comments[-1].text == "Closing a PR"
def test_comment_force_close_pull_request_rejected(
self, pr_util, csrf_token, xhr_header
):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
PullRequestModel().update_reviewers(
pull_request_id,
[
(1, ["reason"], False, "reviewer", []),
(2, ["reason2"], False, "reviewer", []),
],
pull_request.author,
)
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request_id,
),
params={"close_pull_request": "1", "csrf_token": csrf_token},
extra_environ=xhr_header,
)
pull_request = PullRequest.get(pull_request_id)
journal = (
UserLog.query()
.filter(UserLog.user_id == author, UserLog.repository_id == repo)
.order_by(UserLog.user_log_id.asc())
.all()
)
assert journal[-1].action == "repo.pull_request.close"
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request
)
assert status == ChangesetStatus.STATUS_REJECTED
def test_comment_and_close_pull_request(self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
),
params={"close_pull_request": "true", "csrf_token": csrf_token},
extra_environ=xhr_header,
)
assert response.json
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request
)
assert status == ChangesetStatus.STATUS_REJECTED
def test_comment_and_close_pull_request_try_edit_comment(
self, pr_util, csrf_token, xhr_header
):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
target_scm = pull_request.target_repo.scm_instance()
target_scm_name = target_scm.name
response = self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=target_scm_name,
pull_request_id=pull_request_id,
),
params={
"close_pull_request": "true",
"csrf_token": csrf_token,
},
extra_environ=xhr_header,
)
assert response.json
pull_request = PullRequest.get(pull_request_id)
target_scm = pull_request.target_repo.scm_instance()
target_scm_name = target_scm.name
assert pull_request.is_closed()
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request
)
assert status == ChangesetStatus.STATUS_REJECTED
for comment_id in response.json.keys():
test_text = "test"
response = self.app.post(
route_path(
"pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
"csrf_token": csrf_token,
"text": test_text,
},
status=403,
)
assert response.status_int == 403
def test_comment_and_comment_edit(self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
target_scm = pull_request.target_repo.scm_instance()
target_scm_name = target_scm.name
response = self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
),
params={
"csrf_token": csrf_token,
"text": "init",
},
extra_environ=xhr_header,
)
assert response.json
for comment_id in response.json.keys():
assert comment_id
test_text = "test"
self.app.post(
route_path(
"pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
"csrf_token": csrf_token,
"text": test_text,
"version": "0",
},
)
text_form_db = (
ChangesetComment.query()
.filter(ChangesetComment.comment_id == comment_id)
.first()
.text
)
assert test_text == text_form_db
def test_comment_and_comment_edit_special(self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
target_scm = pull_request.target_repo.scm_instance()
target_scm_name = target_scm.name
response = self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
),
params={
"csrf_token": csrf_token,
"text": "init",
},
extra_environ=xhr_header,
)
assert response.json
for comment_id in response.json.keys():
test_text = "init"
response = self.app.post(
route_path(
"pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
"csrf_token": csrf_token,
"text": test_text,
"version": "0",
},
status=404,
)
assert response.status_int == 404
def test_comment_and_try_edit_already_edited(self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
target_scm = pull_request.target_repo.scm_instance()
target_scm_name = target_scm.name
response = self.app.post(
route_path(
"pullrequest_comment_create",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
),
params={
"csrf_token": csrf_token,
"text": "init",
},
extra_environ=xhr_header,
)
assert response.json
for comment_id in response.json.keys():
test_text = "test"
self.app.post(
route_path(
"pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
"csrf_token": csrf_token,
"text": test_text,
"version": "0",
},
)
test_text_v2 = "test_v2"
response = self.app.post(
route_path(
"pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
"csrf_token": csrf_token,
"text": test_text_v2,
"version": "0",
},
status=409,
)
assert response.status_int == 409
text_form_db = (
ChangesetComment.query()
.filter(ChangesetComment.comment_id == comment_id)
.first()
.text
)
assert test_text == text_form_db
assert test_text_v2 != text_form_db
def test_comment_and_comment_edit_permissions_forbidden(
self,
autologin_regular_user,
user_regular,
user_admin,
pr_util,
csrf_token,
xhr_header,
):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False
)
comment = CommentsModel().create(
text="test",
repo=pull_request.target_repo.scm_instance().name,
user=user_admin,
pull_request=pull_request,
)
response = self.app.post(
route_path(
"pullrequest_comment_edit",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment.comment_id,
),
extra_environ=xhr_header,
params={
"csrf_token": csrf_token,
"text": "test_text",
},
status=403,
)
assert response.status_int == 403
def test_create_pull_request(self, backend, csrf_token):
commits = [
{"message": "ancestor"},
{"message": "change"},
{"message": "change2"},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["ancestor"])
source = backend.create_repo(heads=["change2"])
response = self.app.post(
route_path("pullrequest_create", repo_name=source.repo_name),
[
("source_repo", source.repo_name),
("source_ref", "branch:default:" + commit_ids["change2"]),
("target_repo", target.repo_name),
("target_ref", "branch:default:" + commit_ids["ancestor"]),
("common_ancestor", commit_ids["ancestor"]),
("pullrequest_title", "Title"),
("pullrequest_desc", "Description"),
("description_renderer", "markdown"),
("__start__", "review_members:sequence"),
("__start__", "reviewer:mapping"),
("user_id", "1"),
("__start__", "reasons:sequence"),
("reason", "Some reason"),
("__end__", "reasons:sequence"),
("__start__", "rules:sequence"),
("__end__", "rules:sequence"),
("mandatory", "False"),
("__end__", "reviewer:mapping"),
("__end__", "review_members:sequence"),
("__start__", "revisions:sequence"),
("revisions", commit_ids["change"]),
("revisions", commit_ids["change2"]),
("__end__", "revisions:sequence"),
("user", ""),
("csrf_token", csrf_token),
],
status=302,
)
location = response.headers["Location"]
pull_request_id = location.rsplit("/", 1)[1]
assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
# check that we have now both revisions
assert pull_request.revisions == [commit_ids["change2"], commit_ids["change"]]
assert pull_request.source_ref == "branch:default:" + commit_ids["change2"]
expected_target_ref = "branch:default:" + commit_ids["ancestor"]
assert pull_request.target_ref == expected_target_ref
def test_reviewer_notifications(self, backend, csrf_token):
# We have to use the app.post for this test, so it will create the
# notifications properly with the new PR
commits = [
{
"message": "ancestor",
"added": [FileNode(b"file_A", content=b"content_of_ancestor")],
},
{
"message": "change",
"added": [FileNode(b"file_a", content=b"content_of_change")],
},
{"message": "change-child"},
{
"message": "ancestor-child",
"parents": ["ancestor"],
"branch": "feature",
"added": [FileNode(b"file_c", content=b"content_of_ancestor_child")],
},
{"message": "ancestor-child-2", "branch": "feature"},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["ancestor-child"])
source = backend.create_repo(heads=["change"])
response = self.app.post(
route_path("pullrequest_create", repo_name=source.repo_name),
[
("source_repo", source.repo_name),
("source_ref", "branch:default:" + commit_ids["change"]),
("target_repo", target.repo_name),
("target_ref", "branch:default:" + commit_ids["ancestor-child"]),
("common_ancestor", commit_ids["ancestor"]),
("pullrequest_title", "Title"),
("pullrequest_desc", "Description"),
("description_renderer", "markdown"),
("__start__", "review_members:sequence"),
("__start__", "reviewer:mapping"),
("user_id", "2"),
("__start__", "reasons:sequence"),
("reason", "Some reason"),
("__end__", "reasons:sequence"),
("__start__", "rules:sequence"),
("__end__", "rules:sequence"),
("mandatory", "False"),
("__end__", "reviewer:mapping"),
("__end__", "review_members:sequence"),
("__start__", "revisions:sequence"),
("revisions", commit_ids["change"]),
("__end__", "revisions:sequence"),
("user", ""),
("csrf_token", csrf_token),
],
status=302,
)
location = response.headers["Location"]
pull_request_id = location.rsplit("/", 1)[1]
assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
# Check that a notification was made
notifications = Notification.query().filter(
Notification.created_by == pull_request.author.user_id,
Notification.type_ == Notification.TYPE_PULL_REQUEST,
Notification.subject.contains(
"requested a pull request review. !%s" % pull_request_id
),
)
assert len(notifications.all()) == 1
# Change reviewers and check that a notification was made
PullRequestModel().update_reviewers(
pull_request.pull_request_id,
[(1, [], False, "reviewer", [])],
pull_request.author,
)
assert len(notifications.all()) == 2
@pytest.mark.xfail(reason="unable to fix this test after python3 migration")
def test_create_pull_request_stores_ancestor_commit_id(self, backend, csrf_token):
commits = [
{
"message": "ancestor",
"added": [FileNode(b"file_A", content=b"content_of_ancestor")],
},
{
"message": "change",
"added": [FileNode(b"file_a", content=b"content_of_change")],
},
{
"message": "change-child",
"added": [FileNode(b"file_c", content=b"content_of_change_2")],
},
{
"message": "ancestor-child",
"parents": ["ancestor"],
"branch": "feature",
"added": [FileNode(b"file_B", content=b"content_of_ancestor_child")],
},
{"message": "ancestor-child-2", "branch": "feature"},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["ancestor-child"])
source = backend.create_repo(heads=["change"])
response = self.app.post(
route_path("pullrequest_create", repo_name=source.repo_name),
[
("source_repo", source.repo_name),
("source_ref", "branch:default:" + commit_ids["change"]),
("target_repo", target.repo_name),
("target_ref", "branch:default:" + commit_ids["ancestor-child"]),
("common_ancestor", commit_ids["ancestor"]),
("pullrequest_title", "Title"),
("pullrequest_desc", "Description"),
("description_renderer", "markdown"),
("__start__", "review_members:sequence"),
("__start__", "reviewer:mapping"),
("user_id", "1"),
("__start__", "reasons:sequence"),
("reason", "Some reason"),
("__end__", "reasons:sequence"),
("__start__", "rules:sequence"),
("__end__", "rules:sequence"),
("mandatory", "False"),
("__end__", "reviewer:mapping"),
("__end__", "review_members:sequence"),
("__start__", "revisions:sequence"),
("revisions", commit_ids["change"]),
("__end__", "revisions:sequence"),
("user", ""),
("csrf_token", csrf_token),
],
status=302,
)
location = response.headers["Location"]
pull_request_id = location.rsplit("/", 1)[1]
assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
# target_ref has to point to the ancestor's commit_id in order to
# show the correct diff
expected_target_ref = "branch:default:" + commit_ids["ancestor"]
assert pull_request.target_ref == expected_target_ref
# Check generated diff contents
response = response.follow()
response.mustcontain(no=["content_of_ancestor"])
response.mustcontain(no=["content_of_ancestor-child"])
response.mustcontain("content_of_change")
def test_merge_pull_request_enabled(self, pr_util, csrf_token):
# Clear any previous calls to rcextensions
rhodecode.EXTENSIONS.calls.clear()
pull_request = pr_util.create_pull_request(approved=True, mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = (pull_request.target_repo.scm_instance().name,)
url = route_path(
"pullrequest_merge",
repo_name=str(repo_name[0]),
pull_request_id=pull_request_id,
)
response = self.app.post(url, params={"csrf_token": csrf_token}).follow()
pull_request = PullRequest.get(pull_request_id)
assert response.status_int == 200
assert pull_request.is_closed()
assert_pull_request_status(pull_request, ChangesetStatus.STATUS_APPROVED)
# Check the relevant log entries were added
user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(3)
actions = [log.action for log in user_logs]
pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
expected_actions = [
"repo.pull_request.close",
"repo.pull_request.merge",
"repo.pull_request.comment.create",
]
assert actions == expected_actions
user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(4)
actions = [log for log in user_logs]
assert actions[-1].action == "user.push"
assert actions[-1].action_data["commit_ids"] == pr_commit_ids
# Check post_push rcextension was really executed
push_calls = rhodecode.EXTENSIONS.calls["_push_hook"]
assert len(push_calls) == 1
unused_last_call_args, last_call_kwargs = push_calls[0]
assert last_call_kwargs["action"] == "push"
assert last_call_kwargs["commit_ids"] == pr_commit_ids
def test_merge_pull_request_disabled(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=False)
pull_request_id = pull_request.pull_request_id
pull_request = PullRequest.get(pull_request_id)
response = self.app.post(
route_path(
"pullrequest_merge",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
),
params={"csrf_token": csrf_token},
).follow()
assert response.status_int == 200
response.mustcontain(
"Merge is not currently possible because of below failed checks."
)
response.mustcontain("Server-side pull request merging is disabled.")
@pytest.mark.skip_backends("svn")
def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
response = self.app.post(
route_path(
"pullrequest_merge",
repo_name=repo_name,
pull_request_id=pull_request_id,
),
params={"csrf_token": csrf_token},
).follow()
assert response.status_int == 200
response.mustcontain(
"Merge is not currently possible because of below failed checks."
)
response.mustcontain("Pull request reviewer approval is pending.")
def test_merge_pull_request_renders_failure_reason(
self, user_regular, csrf_token, pr_util
):
pull_request = pr_util.create_pull_request(mergeable=True, approved=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
merge_resp = MergeResponse(
True,
False,
Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"),
MergeFailureReason.PUSH_FAILED,
metadata={"target": "shadow repo", "merge_commit": "xxx"},
)
model_patcher = mock.patch.multiple(
PullRequestModel,
merge_repo=mock.Mock(return_value=merge_resp),
merge_status=mock.Mock(return_value=(None, True, "WRONG_MESSAGE")),
)
with model_patcher:
response = self.app.post(
route_path(
"pullrequest_merge",
repo_name=repo_name,
pull_request_id=pull_request_id,
),
params={"csrf_token": csrf_token},
status=302,
)
merge_resp = MergeResponse(
True,
True,
Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"),
MergeFailureReason.PUSH_FAILED,
metadata={"target": "shadow repo", "merge_commit": "xxx"},
)
assert_session_flash(response, merge_resp.merge_status_message)
def test_update_source_revision(self, backend, csrf_token):
commits = [
{"message": "ancestor"},
{"message": "change"},
{"message": "change-2"},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["ancestor"])
source = backend.create_repo(heads=["change"])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
pull_request.source_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["change"]
)
pull_request.target_repo = target
pull_request.target_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["ancestor"]
)
pull_request.revisions = [commit_ids["change"]]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
pull_request.pull_request_state = PullRequest.STATE_CREATED
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# source has ancestor - change - change-2
backend.pull_heads(source, heads=["change-2"])
target_repo_name = target.repo_name
# update PR
self.app.post(
route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
),
params={"update_commits": "true", "csrf_token": csrf_token},
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request.pull_request_id,
)
)
assert response.status_int == 200
response.mustcontain("Pull request updated to")
response.mustcontain("with 1 added, 0 removed commits.")
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids["change-2"], commit_ids["change"]]
def test_update_target_revision(self, backend, csrf_token):
"""
Checks when we add more commits into a target branch, and update PR
"""
commits = [
{"message": "commit-a"}, # main branch (our PR target)
{"message": "commit-b"}, # Initial source
{"message": "commit-c"},
{"message": "commit-a-prime", "branch": "feature", "parents": ["commit-a"]}, # main branch (source)
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["commit-a"])
source = backend.create_repo(heads=["commit-b"])
target_repo_name = target.repo_name
# create pr from commit-b to commit-a
pull_request = PullRequest()
pull_request.target_repo = target
pull_request.target_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["commit-a"]
)
pull_request.source_repo = source
pull_request.source_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["commit-b"]
)
pull_request.revisions = [commit_ids["commit-b"]]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
pull_request.pull_request_state = PullRequest.STATE_CREATED
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target - add one commit on top commit-a -> commit-b
backend.pull_heads(target, heads=["commit-b"])
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(source, heads=["commit-c"])
# update PR
url = route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
)
self.app.post(
url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200
)
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids["commit-c"]]
assert pull_request.target_ref == "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["commit-b"]
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=target_repo_name,
pull_request_id=pull_request.pull_request_id,
)
)
assert response.status_int == 200
response.mustcontain("Pull request updated to")
response.mustcontain("with 1 added, 1 removed commits.")
def test_update_target_revision_with_removal_of_1_commit_git(
self, backend_git, csrf_token
):
backend = backend_git
commits = [
{"message": "master-commit-1"},
{"message": "master-commit-2-change-1"},
{"message": "master-commit-3-change-2"},
{
"message": "feat-commit-1",
"parents": ["master-commit-1"],
"branch": "feature",
},
{"message": "feat-commit-2", "branch": "feature"},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=["master-commit-3-change-2"])
source = backend.create_repo(heads=["feat-commit-2"])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
pull_request.source_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name,
commit_id=commit_ids["master-commit-3-change-2"],
)
pull_request.target_repo = target
pull_request.target_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name, commit_id=commit_ids["feat-commit-2"]
)
pull_request.revisions = [
commit_ids["feat-commit-1"],
commit_ids["feat-commit-2"],
]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
pull_request.pull_request_state = PullRequest.STATE_CREATED
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# PR is created, now we simulate a force-push into target,
# that drops a 2 last commits
vcsrepo = target.scm_instance()
vcsrepo.config.clear_section("hooks")
vcsrepo.run_git_command(["reset", "--soft", "HEAD~2"])
target_repo_name = target.repo_name
# update PR
url = route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
)
self.app.post(
url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200
)
response = self.app.get(
route_path("pullrequest_new", repo_name=target_repo_name)
)
assert response.status_int == 200
response.mustcontain("Pull request updated to")
response.mustcontain("with 0 added, 0 removed commits.")
@pytest.mark.xfail(reason="unable to fix after pygit update")
def test_update_pr_ancestor_reference(self, csrf_token, pr_util: PRTestUtility):
commits = [
{"message": "ancestor"},
{"message": "change"},
{"message": "change-2"},
{"message": "ancestor-new", "parents": ["ancestor"], "branch": "feature"},
{"message": "change-rebased", "branch": "feature"},
]
pull_request = pr_util.create_pull_request(
commits,
target_head="ancestor",
source_head="change",
revisions=["change"],
)
pull_request_id = pull_request.pull_request_id
target_repo_name = pr_util.target_repository.repo_name
commit_ids = pr_util.commit_ids
assert pull_request.revisions == [commit_ids["change"]]
assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name]
assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name]
branch = "feature"
pr_util.update_target_repository(head="ancestor-new", do_fetch=True)
pr_util.set_pr_target_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["ancestor-new"])
pr_util.update_source_repository(head="change-rebased", do_fetch=True)
pr_util.set_pr_source_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["change-rebased"])
assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch]
assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch]
Session().add(pr_util.pull_request)
Session().commit()
self.app.post(
route_path(
"pullrequest_update",
repo_name=target_repo_name,
pull_request_id=pull_request_id,
),
params={"update_commits": "true", "csrf_token": csrf_token, "force_refresh": True},
status=200,
)
# response = self.app.get(
# route_path(
# "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id,
# params={"force_refresh": True}
# ),
# )
#
# response.mustcontain("Pull request updated to")
# response.mustcontain("with 1 added, 0 removed commits.")
pull_request = PullRequest.get(pull_request_id)
assert pull_request.target_ref == "branch:{branch}:{commit_id}".format(
branch="feature", commit_id=commit_ids["ancestor-new"])
assert pull_request.revisions == [commit_ids["change-rebased"]]
def test_remove_pull_request_branch(self, backend_git, csrf_token):
branch_name = "development"
commits = [
{"message": "initial-commit"},
{"message": "old-feature"},
{"message": "new-feature", "branch": branch_name},
]
repo = backend_git.create_repo(commits)
repo_name = repo.repo_name
commit_ids = backend_git.commit_ids
pull_request = PullRequest()
pull_request.source_repo = repo
pull_request.target_repo = repo
pull_request.source_ref = "branch:{branch}:{commit_id}".format(
branch=branch_name, commit_id=commit_ids["new-feature"]
)
pull_request.target_ref = "branch:{branch}:{commit_id}".format(
branch=backend_git.default_branch_name, commit_id=commit_ids["old-feature"]
)
pull_request.revisions = [commit_ids["new-feature"]]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
pull_request.pull_request_state = PullRequest.STATE_CREATED
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
vcs = repo.scm_instance()
vcs.remove_ref("refs/heads/{}".format(branch_name))
# NOTE(marcink): run GC to ensure the commits are gone
vcs.run_gc()
response = self.app.get(
route_path(
"pullrequest_show", repo_name=repo_name, pull_request_id=pull_request_id
)
)
assert response.status_int == 200
response.assert_response().element_contains(
"#changeset_compare_view_content .alert strong", "Missing commits"
)
response.assert_response().element_contains(
"#changeset_compare_view_content .alert",
"This pull request cannot be displayed, because one or more"
" commits no longer exist in the source repository.",
)
def test_strip_commits_from_pull_request(self, backend, pr_util):
commits = [
{"message": "initial-commit"},
{"message": "old-feature"},
{"message": "new-feature"},
]
pull_request = pr_util.create_pull_request(
commits,
target_head="initial-commit",
source_head="new-feature",
revisions=["new-feature"],
)
vcs = pr_util.source_repository.scm_instance()
if backend.alias == "git":
vcs.strip(pr_util.commit_ids["new-feature"], branch_name=pr_util.backend.default_branch_name)
else:
vcs.strip(pr_util.commit_ids["new-feature"])
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pr_util.target_repository.repo_name,
pull_request_id=pull_request.pull_request_id,
)
)
assert response.status_int == 200
response.assert_response().element_contains(
"#changeset_compare_view_content .alert strong", "Missing commits"
)
response.assert_response().element_contains(
"#changeset_compare_view_content .alert",
"This pull request cannot be displayed, because one or more"
" commits no longer exist in the source repository.",
)
response.assert_response().element_contains("#update_commits", "Update commits")
def test_strip_commits_and_update(self, backend, pr_util, csrf_token):
commits = [
{"message": "initial-commit"},
{"message": "old-feature"},
{"message": "new-feature", "parents": ["old-feature"]},
]
pull_request = pr_util.create_pull_request(
commits,
target_head="old-feature",
source_head="new-feature",
revisions=["new-feature"],
mergeable=True,
)
pr_id = pull_request.pull_request_id
target_repo_name = pull_request.target_repo.repo_name
vcs = pr_util.source_repository.scm_instance()
if backend.alias == "git":
vcs.strip(pr_util.commit_ids["new-feature"], branch_name="master")
else:
vcs.strip(pr_util.commit_ids["new-feature"])
url = route_path(
"pullrequest_update", repo_name=target_repo_name, pull_request_id=pr_id
)
response = self.app.post(
url, params={"update_commits": "true", "csrf_token": csrf_token}
)
assert response.status_int == 200
assert json.loads(response.body) == json.loads(
'{"response": true, "redirect_url": null}'
)
# Make sure that after update, it won't raise 500 errors
response = self.app.get(
route_path(
"pullrequest_show", repo_name=target_repo_name, pull_request_id=pr_id
)
)
assert response.status_int == 200
response.assert_response().element_contains(
"#changeset_compare_view_content .alert strong", "Missing commits"
)
def test_branch_is_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = "branch:origin:1234567890abcdef"
pull_request.target_ref = "branch:target:abcdef1234567890"
Session().add(pull_request)
Session().commit()
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
assert response.status_int == 200
source = response.assert_response().get_element(".pr-source-info")
source_parent = source.getparent()
assert len(source_parent) == 1
target = response.assert_response().get_element(".pr-target-info")
target_parent = target.getparent()
assert len(target_parent) == 1
expected_origin_link = route_path(
"repo_commits",
repo_name=pull_request.source_repo.scm_instance().name,
params=dict(branch="origin"),
)
expected_target_link = route_path(
"repo_commits",
repo_name=pull_request.target_repo.scm_instance().name,
params=dict(branch="target"),
)
assert source_parent.attrib["href"] == expected_origin_link
assert target_parent.attrib["href"] == expected_target_link
def test_bookmark_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = "bookmark:origin:1234567890abcdef"
pull_request.target_ref = "bookmark:target:abcdef1234567890"
Session().add(pull_request)
Session().commit()
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
assert response.status_int == 200
source = response.assert_response().get_element(".pr-source-info")
assert source.text.strip() == "bookmark:origin"
assert source.getparent().attrib.get("href") is None
target = response.assert_response().get_element(".pr-target-info")
assert target.text.strip() == "bookmark:target"
assert target.getparent().attrib.get("href") is None
def test_tag_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = "tag:origin:1234567890abcdef"
pull_request.target_ref = "tag:target:abcdef1234567890"
Session().add(pull_request)
Session().commit()
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
assert response.status_int == 200
source = response.assert_response().get_element(".pr-source-info")
assert source.text.strip() == "tag:origin"
assert source.getparent().attrib.get("href") is None
target = response.assert_response().get_element(".pr-target-info")
assert target.text.strip() == "tag:target"
assert target.getparent().attrib.get("href") is None
@pytest.mark.parametrize("mergeable", [True, False])
def test_shadow_repository_link(self, mergeable, pr_util, http_host_only_stub):
"""
Check that the pull request summary page displays a link to the shadow
repository if the pull request is mergeable. If it is not mergeable
the link should not be displayed.
"""
pull_request = pr_util.create_pull_request(
mergeable=mergeable, enable_notifications=False
)
target_repo = pull_request.target_repo.scm_instance()
pr_id = pull_request.pull_request_id
shadow_url = "{host}/{repo}/pull-request/{pr_id}/repository".format(
host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id
)
response = self.app.get(
route_path(
"pullrequest_show", repo_name=target_repo.name, pull_request_id=pr_id
)
)
if mergeable:
response.assert_response().element_value_contains(
"input.pr-mergeinfo", shadow_url
)
response.assert_response().element_value_contains(
"input.pr-mergeinfo ", "pr-merge"
)
else:
response.assert_response().no_element_exists(".pr-mergeinfo")
@pytest.mark.usefixtures("app")
@pytest.mark.backends("git", "hg")
class TestPullrequestsControllerDelete(object):
def test_pull_request_delete_button_permissions_admin(
self, autologin_user, user_admin, pr_util
):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
response.mustcontain('id="delete_pullrequest"')
response.mustcontain("Confirm to delete this pull request")
def test_pull_request_delete_button_permissions_owner(
self, autologin_regular_user, user_regular, pr_util
):
pull_request = pr_util.create_pull_request(
author=user_regular.username, enable_notifications=False
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
response.mustcontain('id="delete_pullrequest"')
response.mustcontain("Confirm to delete this pull request")
def test_pull_request_delete_button_permissions_forbidden(
self, autologin_regular_user, user_regular, user_admin, pr_util
):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
response.mustcontain(no=['id="delete_pullrequest"'])
response.mustcontain(no=["Confirm to delete this pull request"])
def test_pull_request_delete_button_permissions_can_update_cannot_delete(
self, autologin_regular_user, user_regular, user_admin, pr_util, user_util
):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False
)
user_util.grant_user_permission_to_repo(
pull_request.target_repo, user_regular, "repository.write"
)
response = self.app.get(
route_path(
"pullrequest_show",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
)
)
response.mustcontain('id="open_edit_pullrequest"')
response.mustcontain('id="delete_pullrequest"')
response.mustcontain(no=["Confirm to delete this pull request"])
def test_delete_comment_returns_404_if_comment_does_not_exist(
self, autologin_user, pr_util, user_admin, csrf_token, xhr_header
):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False
)
self.app.post(
route_path(
"pullrequest_comment_delete",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=1024404,
),
extra_environ=xhr_header,
params={"csrf_token": csrf_token},
status=404,
)
def test_delete_comment(
self, autologin_user, pr_util, user_admin, csrf_token, xhr_header
):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False
)
comment = pr_util.create_comment()
comment_id = comment.comment_id
response = self.app.post(
route_path(
"pullrequest_comment_delete",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={"csrf_token": csrf_token},
status=200,
)
assert response.text == "true"
@pytest.mark.parametrize(
"url_type",
[
"pullrequest_new",
"pullrequest_create",
"pullrequest_update",
"pullrequest_merge",
],
)
def test_pull_request_is_forbidden_on_archived_repo(
self, autologin_user, backend, xhr_header, user_util, url_type
):
# create a temporary repo
source = user_util.create_repo(repo_type=backend.alias)
repo_name = source.repo_name
repo = Repository.get_by_repo_name(repo_name)
repo.archived = True
Session().commit()
response = self.app.get(
route_path(url_type, repo_name=repo_name, pull_request_id=1), status=302
)
msg = "Action not supported for archived repository."
assert_session_flash(response, msg)
def assert_pull_request_status(pull_request, expected_status):
status = ChangesetStatusModel().calculated_review_status(pull_request=pull_request)
assert status == expected_status
@pytest.mark.parametrize("route", ["pullrequest_new", "pullrequest_create"])
@pytest.mark.usefixtures("autologin_user")
def test_forbidde_to_repo_summary_for_svn_repositories(backend_svn, app, route):
app.get(route_path(route, repo_name=backend_svn.repo_name), status=404)