##// END OF EJS Templates
vcs: Minimal change to expose the shadow repository...
vcs: Minimal change to expose the shadow repository Based on my original research, this was the "minimal" starting point. It shows that three concepts are needed for the "repo_name": * From the security standpoint we think of the shadow repository having the same ACL as the target repository of the pull request. This is because the pull request itself is considered to be a part of the target repository. Out of this thought, the variable "acl_repo_name" is used whenever we want to check permissions or when we need the database configuration of the repository. An alternative name would have been "db_repo_name", but the usage for ACL checking is the most important one. * From the web interaction perspective, we need the URL which was originally used to get to the repository. This is because based on this base URL commands can be identified. Especially for Git this is important, so that the commands are correctly recognized. Since the URL is in the focus, this is called "url_repo_name". * Finally we have to deal with the repository on the file system. This is what the VCS layer deal with normally, so this name is called "vcs_repo_name". The original repository interaction is a special case where all three names are the same. When interacting with a pull request, these three names are typically all different. This change is minimal in a sense that it just makes the interaction with a shadow repository barely work, without checking any special constraints yet. This was the starting point for further work on this topic.

File last commit:

r873:930d1a1f default
r887:175782be default
Show More
test_pullrequests.py
940 lines | 39.2 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import mock
import pytest
from webob.exc import HTTPNotFound
import rhodecode
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import (
PullRequest, ChangesetStatus, UserLog, Notification)
from rhodecode.model.meta import Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
from rhodecode.tests.utils import AssertResponse
@pytest.mark.usefixtures('app', 'autologin_user')
@pytest.mark.backends("git", "hg")
class TestPullrequestsController:
def test_index(self, backend):
self.app.get(url(
controller='pullrequests', action='index',
repo_name=backend.repo_name))
def test_option_menu_create_pull_request_exists(self, backend):
repo_name = backend.repo_name
response = self.app.get(url('summary_home', repo_name=repo_name))
create_pr_link = '<a href="%s">Create Pull Request</a>' % url(
'pullrequest', repo_name=repo_name)
response.mustcontain(create_pr_link)
def test_global_redirect_of_pr(self, backend, pr_util):
pull_request = pr_util.create_pull_request()
response = self.app.get(
url('pull_requests_global',
pull_request_id=pull_request.pull_request_id))
repo_name = pull_request.target_repo.repo_name
redirect_url = url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request.pull_request_id)
assert response.status == '302 Found'
assert redirect_url in response.location
def test_create_pr_form_with_raw_commit_id(self, backend):
repo = backend.repo
self.app.get(
url(controller='pullrequests', action='index',
repo_name=repo.repo_name,
commit=repo.get_commit().raw_id),
status=200)
@pytest.mark.parametrize('pr_merge_enabled', [True, False])
def test_show(self, pr_util, pr_merge_enabled):
pull_request = pr_util.create_pull_request(
mergeable=pr_merge_enabled, enable_notifications=False)
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
assert pull_request.target_ref_parts.type in response
assert pull_request.target_ref_parts.name in response
target_clone_url = pull_request.target_repo.clone_url()
assert target_clone_url in response
assert 'class="pull-request-merge"' in response
assert (
'Server-side pull request merging is disabled.'
in response) != pr_merge_enabled
def test_close_status_visibility(self, pr_util, csrf_token):
from rhodecode.tests.functional.test_login import login_url, logut_url
# Logout
response = self.app.post(
logut_url,
params={'csrf_token': csrf_token})
# Login as regular user
response = self.app.post(login_url,
{'username': 'test_regular',
'password': 'test12'})
pull_request = pr_util.create_pull_request(author='test_regular')
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
assert 'Server-side pull request merging is disabled.' in response
assert 'value="forced_closed"' in response
def test_show_invalid_commit_id(self, pr_util):
# Simulating invalid revisions which will cause a lookup error
pull_request = pr_util.create_pull_request()
pull_request.revisions = ['invalid']
Session().add(pull_request)
Session().commit()
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
def test_show_invalid_source_reference(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:b:invalid'
Session().add(pull_request)
Session().commit()
self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
def test_edit_title_description(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
url(controller='pullrequests', action='update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=str(pull_request_id)),
params={
'edit_pull_request': 'true',
'_method': 'put',
'title': 'New title',
'description': 'New description',
'csrf_token': csrf_token})
assert_session_flash(
response, u'Pull request title & description updated.',
category='success')
pull_request = PullRequest.get(pull_request_id)
assert pull_request.title == 'New title'
assert pull_request.description == 'New description'
def test_edit_title_description_closed(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
pr_util.close()
response = self.app.post(
url(controller='pullrequests', action='update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=str(pull_request_id)),
params={
'edit_pull_request': 'true',
'_method': 'put',
'title': 'New title',
'description': 'New description',
'csrf_token': csrf_token})
assert_session_flash(
response, u'Cannot update closed pull requests.',
category='error')
def test_update_invalid_source_reference(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
url(controller='pullrequests', action='update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=str(pull_request_id)),
params={'update_commits': 'true', '_method': 'put',
'csrf_token': csrf_token})
assert_session_flash(
response, u'Update failed due to missing commits.',
category='error')
def test_comment_and_close_pull_request(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(approved=True)
pull_request_id = pull_request.pull_request_id
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
url(controller='pullrequests',
action='comment',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request_id)),
params={
'changeset_status':
ChangesetStatus.STATUS_APPROVED + '_closed',
'change_changeset_status': 'on',
'text': '',
'csrf_token': csrf_token},
status=302)
action = 'user_closed_pull_request:%d' % pull_request_id
journal = UserLog.query()\
.filter(UserLog.user_id == author)\
.filter(UserLog.repository_id == repo)\
.filter(UserLog.action == action)\
.all()
assert len(journal) == 1
def test_reject_and_close_pull_request(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
url(controller='pullrequests',
action='update',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)),
params={'close_pull_request': 'true', '_method': 'put',
'csrf_token': csrf_token})
pull_request = PullRequest.get(pull_request_id)
assert response.json is True
assert pull_request.is_closed()
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_REJECTED
def test_comment_force_close_pull_request(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
reviewers_data = [(1, ['reason']), (2, ['reason2'])]
PullRequestModel().update_reviewers(pull_request_id, reviewers_data)
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
url(controller='pullrequests',
action='comment',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request_id)),
params={
'changeset_status': 'forced_closed',
'csrf_token': csrf_token},
status=302)
pull_request = PullRequest.get(pull_request_id)
action = 'user_closed_pull_request:%d' % pull_request_id
journal = UserLog.query().filter(
UserLog.user_id == author,
UserLog.repository_id == repo,
UserLog.action == action).all()
assert len(journal) == 1
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_REJECTED
def test_create_pull_request(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
url(
controller='pullrequests',
action='create',
repo_name=source.repo_name
),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor']),
('pullrequest_desc', 'Description'),
('pullrequest_title', 'Title'),
('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '1'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('revisions', commit_ids['change']),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = int(location.rsplit('/', 1)[1])
pull_request = PullRequest.get(pull_request_id)
# check that we have now both revisions
assert pull_request.revisions == [commit_ids['change']]
assert pull_request.source_ref == 'branch:default:' + commit_ids['change']
expected_target_ref = 'branch:default:' + commit_ids['ancestor']
assert pull_request.target_ref == expected_target_ref
def test_reviewer_notifications(self, backend, csrf_token):
# We have to use the app.post for this test so it will create the
# notifications properly with the new PR
commits = [
{'message': 'ancestor',
'added': [FileNode('file_A', content='content_of_ancestor')]},
{'message': 'change',
'added': [FileNode('file_a', content='content_of_change')]},
{'message': 'change-child'},
{'message': 'ancestor-child', 'parents': ['ancestor'],
'added': [
FileNode('file_B', content='content_of_ancestor_child')]},
{'message': 'ancestor-child-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor-child'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
url(
controller='pullrequests',
action='create',
repo_name=source.repo_name
),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
('pullrequest_desc', 'Description'),
('pullrequest_title', 'Title'),
('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '2'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('revisions', commit_ids['change']),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = int(location.rsplit('/', 1)[1])
pull_request = PullRequest.get(pull_request_id)
# Check that a notification was made
notifications = Notification.query()\
.filter(Notification.created_by == pull_request.author.user_id,
Notification.type_ == Notification.TYPE_PULL_REQUEST,
Notification.subject.contains("wants you to review "
"pull request #%d"
% pull_request_id))
assert len(notifications.all()) == 1
# Change reviewers and check that a notification was made
PullRequestModel().update_reviewers(
pull_request.pull_request_id, [(1, [])])
assert len(notifications.all()) == 2
def test_create_pull_request_stores_ancestor_commit_id(self, backend,
csrf_token):
commits = [
{'message': 'ancestor',
'added': [FileNode('file_A', content='content_of_ancestor')]},
{'message': 'change',
'added': [FileNode('file_a', content='content_of_change')]},
{'message': 'change-child'},
{'message': 'ancestor-child', 'parents': ['ancestor'],
'added': [
FileNode('file_B', content='content_of_ancestor_child')]},
{'message': 'ancestor-child-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor-child'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
url(
controller='pullrequests',
action='create',
repo_name=source.repo_name
),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
('pullrequest_desc', 'Description'),
('pullrequest_title', 'Title'),
('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '1'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('revisions', commit_ids['change']),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = int(location.rsplit('/', 1)[1])
pull_request = PullRequest.get(pull_request_id)
# target_ref has to point to the ancestor's commit_id in order to
# show the correct diff
expected_target_ref = 'branch:default:' + commit_ids['ancestor']
assert pull_request.target_ref == expected_target_ref
# Check generated diff contents
response = response.follow()
assert 'content_of_ancestor' not in response.body
assert 'content_of_ancestor-child' not in response.body
assert 'content_of_change' in response.body
def test_merge_pull_request_enabled(self, pr_util, csrf_token):
# Clear any previous calls to rcextensions
rhodecode.EXTENSIONS.calls.clear()
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name,
response = self.app.post(
url(controller='pullrequests',
action='merge',
repo_name=str(repo_name[0]),
pull_request_id=str(pull_request_id)),
params={'csrf_token': csrf_token}).follow()
pull_request = PullRequest.get(pull_request_id)
assert response.status_int == 200
assert pull_request.is_closed()
assert_pull_request_status(
pull_request, ChangesetStatus.STATUS_APPROVED)
# Check the relevant log entries were added
user_logs = UserLog.query().order_by('-user_log_id').limit(4)
actions = [log.action for log in user_logs]
pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
expected_actions = [
u'user_closed_pull_request:%d' % pull_request_id,
u'user_merged_pull_request:%d' % pull_request_id,
# The action below reflect that the post push actions were executed
u'user_commented_pull_request:%d' % pull_request_id,
u'push:%s' % ','.join(pr_commit_ids),
]
assert actions == expected_actions
# Check post_push rcextension was really executed
push_calls = rhodecode.EXTENSIONS.calls['post_push']
assert len(push_calls) == 1
unused_last_call_args, last_call_kwargs = push_calls[0]
assert last_call_kwargs['action'] == 'push'
assert last_call_kwargs['pushed_revs'] == pr_commit_ids
def test_merge_pull_request_disabled(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=False)
pull_request_id = pull_request.pull_request_id
pull_request = PullRequest.get(pull_request_id)
response = self.app.post(
url(controller='pullrequests',
action='merge',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)),
params={'csrf_token': csrf_token}).follow()
assert response.status_int == 200
assert 'Server-side pull request merging is disabled.' in response.body
@pytest.mark.skip_backends('svn')
def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name,
response = self.app.post(
url(controller='pullrequests',
action='merge',
repo_name=str(repo_name[0]),
pull_request_id=str(pull_request_id)),
params={'csrf_token': csrf_token}).follow()
pull_request = PullRequest.get(pull_request_id)
assert response.status_int == 200
assert ' Reviewer approval is pending.' in response.body
def test_update_source_revision(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name, commit_id=commit_ids['change'])
pull_request.target_repo = target
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# source has ancestor - change - change-2
backend.pull_heads(source, heads=['change-2'])
# update PR
self.app.post(
url(controller='pullrequests', action='update',
repo_name=target.repo_name,
pull_request_id=str(pull_request_id)),
params={'update_commits': 'true', '_method': 'put',
'csrf_token': csrf_token})
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [
commit_ids['change-2'], commit_ids['change']]
# TODO: johbo: this should be a test on its own
response = self.app.get(url(
controller='pullrequests', action='index',
repo_name=target.repo_name))
assert response.status_int == 200
assert 'Pull request updated to' in response.body
assert 'with 1 added, 0 removed commits.' in response.body
def test_update_target_revision(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'ancestor-new', 'parents': ['ancestor']},
{'message': 'change-rebased'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name, commit_id=commit_ids['change'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target has ancestor - ancestor-new
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(target, heads=['ancestor-new'])
backend.pull_heads(source, heads=['change-rebased'])
# update PR
self.app.post(
url(controller='pullrequests', action='update',
repo_name=target.repo_name,
pull_request_id=str(pull_request_id)),
params={'update_commits': 'true', '_method': 'put',
'csrf_token': csrf_token},
status=200)
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids['change-rebased']]
assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor-new'])
# TODO: johbo: This should be a test on its own
response = self.app.get(url(
controller='pullrequests', action='index',
repo_name=target.repo_name))
assert response.status_int == 200
assert 'Pull request updated to' in response.body
assert 'with 1 added, 1 removed commits.' in response.body
def test_update_of_ancestor_reference(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change-2'},
{'message': 'ancestor-new', 'parents': ['ancestor']},
{'message': 'change-rebased'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['change'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target has ancestor - ancestor-new
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(target, heads=['ancestor-new'])
backend.pull_heads(source, heads=['change-rebased'])
# update PR
self.app.post(
url(controller='pullrequests', action='update',
repo_name=target.repo_name,
pull_request_id=str(pull_request_id)),
params={'update_commits': 'true', '_method': 'put',
'csrf_token': csrf_token},
status=200)
# Expect the target reference to be updated correctly
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids['change-rebased']]
expected_target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor-new'])
assert pull_request.target_ref == expected_target_ref
def test_remove_pull_request_branch(self, backend_git, csrf_token):
branch_name = 'development'
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'branch': branch_name},
]
repo = backend_git.create_repo(commits)
commit_ids = backend_git.commit_ids
pull_request = PullRequest()
pull_request.source_repo = repo
pull_request.target_repo = repo
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=branch_name, commit_id=commit_ids['new-feature'])
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend_git.default_branch_name,
commit_id=commit_ids['old-feature'])
pull_request.revisions = [commit_ids['new-feature']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
vcs = repo.scm_instance()
vcs.remove_ref('refs/heads/{}'.format(branch_name))
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=repo.repo_name,
pull_request_id=str(pull_request.pull_request_id)))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
assert_response.element_contains(
'#changeset_compare_view_content .alert',
'This pull request cannot be displayed, because one or more'
' commits no longer exist in the source repository.')
def test_strip_commits_from_pull_request(
self, backend, pr_util, csrf_token):
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'parents': ['initial-commit']},
]
pull_request = pr_util.create_pull_request(
commits, target_head='initial-commit', source_head='new-feature',
revisions=['new-feature'])
vcs = pr_util.source_repository.scm_instance()
if backend.alias == 'git':
vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
else:
vcs.strip(pr_util.commit_ids['new-feature'])
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pr_util.target_repository.repo_name,
pull_request_id=str(pull_request.pull_request_id)))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
assert_response.element_contains(
'#changeset_compare_view_content .alert',
'This pull request cannot be displayed, because one or more'
' commits no longer exist in the source repository.')
assert_response.element_contains(
'#update_commits',
'Update commits')
def test_strip_commits_and_update(
self, backend, pr_util, csrf_token):
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'parents': ['old-feature']},
]
pull_request = pr_util.create_pull_request(
commits, target_head='old-feature', source_head='new-feature',
revisions=['new-feature'], mergeable=True)
vcs = pr_util.source_repository.scm_instance()
if backend.alias == 'git':
vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
else:
vcs.strip(pr_util.commit_ids['new-feature'])
response = self.app.post(
url(controller='pullrequests', action='update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=str(pull_request.pull_request_id)),
params={'update_commits': 'true', '_method': 'put',
'csrf_token': csrf_token})
assert response.status_int == 200
assert response.body == 'true'
# Make sure that after update, it won't raise 500 errors
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pr_util.target_repository.repo_name,
pull_request_id=str(pull_request.pull_request_id)))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
def test_branch_is_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:origin:1234567890abcdef'
pull_request.target_ref = 'branch:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
origin_children = origin.getchildren()
assert len(origin_children) == 1
target = assert_response.get_element('.pr-targetinfo .tag')
target_children = target.getchildren()
assert len(target_children) == 1
expected_origin_link = url(
'changelog_home',
repo_name=pull_request.source_repo.scm_instance().name,
branch='origin')
expected_target_link = url(
'changelog_home',
repo_name=pull_request.target_repo.scm_instance().name,
branch='target')
assert origin_children[0].attrib['href'] == expected_origin_link
assert origin_children[0].text == 'branch: origin'
assert target_children[0].attrib['href'] == expected_target_link
assert target_children[0].text == 'branch: target'
def test_bookmark_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
pull_request.target_ref = 'bookmark:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
assert origin.text.strip() == 'bookmark: origin'
assert origin.getchildren() == []
target = assert_response.get_element('.pr-targetinfo .tag')
assert target.text.strip() == 'bookmark: target'
assert target.getchildren() == []
def test_tag_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'tag:origin:1234567890abcdef'
pull_request.target_ref = 'tag:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(url(
controller='pullrequests', action='show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=str(pull_request.pull_request_id)))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
assert origin.text.strip() == 'tag: origin'
assert origin.getchildren() == []
target = assert_response.get_element('.pr-targetinfo .tag')
assert target.text.strip() == 'tag: target'
assert target.getchildren() == []
def test_description_is_escaped_on_index_page(self, backend, pr_util):
xss_description = "<script>alert('Hi!')</script>"
pull_request = pr_util.create_pull_request(description=xss_description)
response = self.app.get(url(
controller='pullrequests', action='show_all',
repo_name=pull_request.target_repo.repo_name))
response.mustcontain(
"&lt;script&gt;alert(&#39;Hi!&#39;)&lt;/script&gt;")
def assert_pull_request_status(pull_request, expected_status):
status = ChangesetStatusModel().calculated_review_status(
pull_request=pull_request)
assert status == expected_status
@pytest.mark.parametrize('action', ['show_all', 'index', 'create'])
@pytest.mark.usefixtures("autologin_user")
def test_redirects_to_repo_summary_for_svn_repositories(
backend_svn, app, action):
denied_actions = ['show_all', 'index', 'create']
for action in denied_actions:
response = app.get(url(
controller='pullrequests', action=action,
repo_name=backend_svn.repo_name))
assert response.status_int == 302
# Not allowed, redirect to the summary
redirected = response.follow()
summary_url = url('summary_home', repo_name=backend_svn.repo_name)
# URL adds leading slash and path doesn't have it
assert redirected.req.path == summary_url
def test_delete_comment_returns_404_if_comment_does_not_exist(pylonsapp):
# TODO: johbo: Global import not possible because models.forms blows up
from rhodecode.controllers.pullrequests import PullrequestsController
controller = PullrequestsController()
patcher = mock.patch(
'rhodecode.model.db.BaseModel.get', return_value=None)
with pytest.raises(HTTPNotFound), patcher:
controller._delete_comment(1)