##// END OF EJS Templates
pull-requests: increase stability of concurrent pull requests creation by flushing prematurly the statuses of commits....
pull-requests: increase stability of concurrent pull requests creation by flushing prematurly the statuses of commits. This is required to increase the versions on each concurrent call. Otherwise we could get into an integrity errors of commitsha+version+repo

File last commit:

r3371:e7214a9f default
r3408:2a133f7e stable
Show More
test_repo_pullrequests.py
1233 lines | 51.4 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2018 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import mock
import pytest
import rhodecode
from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.lib import helpers as h
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import (
PullRequest, ChangesetStatus, UserLog, Notification, ChangesetComment, Repository)
from rhodecode.model.meta import Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.tests import (
assert_session_flash, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN)
from rhodecode.tests.utils import AssertResponse
def route_path(name, params=None, **kwargs):
import urllib
base_url = {
'repo_changelog': '/{repo_name}/changelog',
'repo_changelog_file': '/{repo_name}/changelog/{commit_id}/{f_path}',
'pullrequest_show': '/{repo_name}/pull-request/{pull_request_id}',
'pullrequest_show_all': '/{repo_name}/pull-request',
'pullrequest_show_all_data': '/{repo_name}/pull-request-data',
'pullrequest_repo_refs': '/{repo_name}/pull-request/refs/{target_repo_name:.*?[^/]}',
'pullrequest_repo_destinations': '/{repo_name}/pull-request/repo-destinations',
'pullrequest_new': '/{repo_name}/pull-request/new',
'pullrequest_create': '/{repo_name}/pull-request/create',
'pullrequest_update': '/{repo_name}/pull-request/{pull_request_id}/update',
'pullrequest_merge': '/{repo_name}/pull-request/{pull_request_id}/merge',
'pullrequest_delete': '/{repo_name}/pull-request/{pull_request_id}/delete',
'pullrequest_comment_create': '/{repo_name}/pull-request/{pull_request_id}/comment',
'pullrequest_comment_delete': '/{repo_name}/pull-request/{pull_request_id}/comment/{comment_id}/delete',
}[name].format(**kwargs)
if params:
base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
return base_url
@pytest.mark.usefixtures('app', 'autologin_user')
@pytest.mark.backends("git", "hg")
class TestPullrequestsView(object):
def test_index(self, backend):
self.app.get(route_path(
'pullrequest_new',
repo_name=backend.repo_name))
def test_option_menu_create_pull_request_exists(self, backend):
repo_name = backend.repo_name
response = self.app.get(h.route_path('repo_summary', repo_name=repo_name))
create_pr_link = '<a href="%s">Create Pull Request</a>' % route_path(
'pullrequest_new', repo_name=repo_name)
response.mustcontain(create_pr_link)
def test_create_pr_form_with_raw_commit_id(self, backend):
repo = backend.repo
self.app.get(
route_path('pullrequest_new', repo_name=repo.repo_name,
commit=repo.get_commit().raw_id),
status=200)
@pytest.mark.parametrize('pr_merge_enabled', [True, False])
@pytest.mark.parametrize('range_diff', ["0", "1"])
def test_show(self, pr_util, pr_merge_enabled, range_diff):
pull_request = pr_util.create_pull_request(
mergeable=pr_merge_enabled, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
params={'range-diff': range_diff}))
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
assert pull_request.target_ref_parts.type in response
assert pull_request.target_ref_parts.name in response
target_clone_url = pull_request.target_repo.clone_url()
assert target_clone_url in response
assert 'class="pull-request-merge"' in response
if pr_merge_enabled:
response.mustcontain('Pull request reviewer approval is pending')
else:
response.mustcontain('Server-side pull request merging is disabled.')
if range_diff == "1":
response.mustcontain('Turn off: Show the diff as commit range')
def test_close_status_visibility(self, pr_util, user_util, csrf_token):
# Logout
response = self.app.post(
h.route_path('logout'),
params={'csrf_token': csrf_token})
# Login as regular user
response = self.app.post(h.route_path('login'),
{'username': TEST_USER_REGULAR_LOGIN,
'password': 'test12'})
pull_request = pr_util.create_pull_request(
author=TEST_USER_REGULAR_LOGIN)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('Server-side pull request merging is disabled.')
assert_response = response.assert_response()
# for regular user without a merge permissions, we don't see it
assert_response.no_element_exists('#close-pull-request-action')
user_util.grant_user_permission_to_repo(
pull_request.target_repo,
UserModel().get_by_username(TEST_USER_REGULAR_LOGIN),
'repository.write')
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('Server-side pull request merging is disabled.')
assert_response = response.assert_response()
# now regular user has a merge permissions, we have CLOSE button
assert_response.one_element_exists('#close-pull-request-action')
def test_show_invalid_commit_id(self, pr_util):
# Simulating invalid revisions which will cause a lookup error
pull_request = pr_util.create_pull_request()
pull_request.revisions = ['invalid']
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
def test_show_invalid_source_reference(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:b:invalid'
Session().add(pull_request)
Session().commit()
self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
def test_edit_title_description(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path('pullrequest_update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id),
params={
'edit_pull_request': 'true',
'title': 'New title',
'description': 'New description',
'csrf_token': csrf_token})
assert_session_flash(
response, u'Pull request title & description updated.',
category='success')
pull_request = PullRequest.get(pull_request_id)
assert pull_request.title == 'New title'
assert pull_request.description == 'New description'
def test_edit_title_description_closed(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.repo_name
pr_util.close()
response = self.app.post(
route_path('pullrequest_update',
repo_name=repo_name, pull_request_id=pull_request_id),
params={
'edit_pull_request': 'true',
'title': 'New title',
'description': 'New description',
'csrf_token': csrf_token}, status=200)
assert_session_flash(
response, u'Cannot update closed pull requests.',
category='error')
def test_update_invalid_source_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import UpdateFailureReason
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path('pullrequest_update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token})
expected_msg = str(PullRequestModel.UPDATE_STATUS_MESSAGES[
UpdateFailureReason.MISSING_SOURCE_REF])
assert_session_flash(response, expected_msg, category='error')
def test_missing_target_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import MergeFailureReason
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
pull_request.target_ref = 'branch:invalid-branch:invalid-commit-id'
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
pull_request_url = route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id)
response = self.app.get(pull_request_url)
assertr = AssertResponse(response)
expected_msg = PullRequestModel.MERGE_STATUS_MESSAGES[
MergeFailureReason.MISSING_TARGET_REF]
assertr.element_contains(
'span[data-role="merge-message"]', str(expected_msg))
def test_comment_and_close_pull_request_custom_message_approved(
self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request(approved=True)
pull_request_id = pull_request.pull_request_id
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
route_path('pullrequest_comment_create',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request_id),
params={
'close_pull_request': '1',
'text': 'Closing a PR',
'csrf_token': csrf_token},
extra_environ=xhr_header,)
journal = UserLog.query()\
.filter(UserLog.user_id == author)\
.filter(UserLog.repository_id == repo) \
.order_by('user_log_id') \
.all()
assert journal[-1].action == 'repo.pull_request.close'
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_APPROVED
comments = ChangesetComment().query() \
.filter(ChangesetComment.pull_request == pull_request) \
.order_by(ChangesetComment.comment_id.asc())\
.all()
assert comments[-1].text == 'Closing a PR'
def test_comment_force_close_pull_request_rejected(
self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
PullRequestModel().update_reviewers(
pull_request_id, [(1, ['reason'], False, []), (2, ['reason2'], False, [])],
pull_request.author)
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
route_path('pullrequest_comment_create',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request_id),
params={
'close_pull_request': '1',
'csrf_token': csrf_token},
extra_environ=xhr_header)
pull_request = PullRequest.get(pull_request_id)
journal = UserLog.query()\
.filter(UserLog.user_id == author, UserLog.repository_id == repo) \
.order_by('user_log_id') \
.all()
assert journal[-1].action == 'repo.pull_request.close'
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_REJECTED
def test_comment_and_close_pull_request(
self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path('pullrequest_comment_create',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id),
params={
'close_pull_request': 'true',
'csrf_token': csrf_token},
extra_environ=xhr_header)
assert response.json
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_REJECTED
def test_create_pull_request(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change2'])
response = self.app.post(
route_path('pullrequest_create', repo_name=source.repo_name),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change2']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor']),
('common_ancestor', commit_ids['ancestor']),
('pullrequest_title', 'Title'),
('pullrequest_desc', 'Description'),
('description_renderer', 'markdown'),
('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '1'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
('__start__', 'rules:sequence'),
('__end__', 'rules:sequence'),
('mandatory', 'False'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('__start__', 'revisions:sequence'),
('revisions', commit_ids['change']),
('revisions', commit_ids['change2']),
('__end__', 'revisions:sequence'),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = location.rsplit('/', 1)[1]
assert pull_request_id != 'new'
pull_request = PullRequest.get(int(pull_request_id))
# check that we have now both revisions
assert pull_request.revisions == [commit_ids['change2'], commit_ids['change']]
assert pull_request.source_ref == 'branch:default:' + commit_ids['change2']
expected_target_ref = 'branch:default:' + commit_ids['ancestor']
assert pull_request.target_ref == expected_target_ref
def test_reviewer_notifications(self, backend, csrf_token):
# We have to use the app.post for this test so it will create the
# notifications properly with the new PR
commits = [
{'message': 'ancestor',
'added': [FileNode('file_A', content='content_of_ancestor')]},
{'message': 'change',
'added': [FileNode('file_a', content='content_of_change')]},
{'message': 'change-child'},
{'message': 'ancestor-child', 'parents': ['ancestor'],
'added': [
FileNode('file_B', content='content_of_ancestor_child')]},
{'message': 'ancestor-child-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor-child'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
route_path('pullrequest_create', repo_name=source.repo_name),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
('common_ancestor', commit_ids['ancestor']),
('pullrequest_title', 'Title'),
('pullrequest_desc', 'Description'),
('description_renderer', 'markdown'),
('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '2'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
('__start__', 'rules:sequence'),
('__end__', 'rules:sequence'),
('mandatory', 'False'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('__start__', 'revisions:sequence'),
('revisions', commit_ids['change']),
('__end__', 'revisions:sequence'),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = location.rsplit('/', 1)[1]
assert pull_request_id != 'new'
pull_request = PullRequest.get(int(pull_request_id))
# Check that a notification was made
notifications = Notification.query()\
.filter(Notification.created_by == pull_request.author.user_id,
Notification.type_ == Notification.TYPE_PULL_REQUEST,
Notification.subject.contains(
"wants you to review pull request #%s" % pull_request_id))
assert len(notifications.all()) == 1
# Change reviewers and check that a notification was made
PullRequestModel().update_reviewers(
pull_request.pull_request_id, [(1, [], False, [])],
pull_request.author)
assert len(notifications.all()) == 2
def test_create_pull_request_stores_ancestor_commit_id(self, backend,
csrf_token):
commits = [
{'message': 'ancestor',
'added': [FileNode('file_A', content='content_of_ancestor')]},
{'message': 'change',
'added': [FileNode('file_a', content='content_of_change')]},
{'message': 'change-child'},
{'message': 'ancestor-child', 'parents': ['ancestor'],
'added': [
FileNode('file_B', content='content_of_ancestor_child')]},
{'message': 'ancestor-child-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor-child'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
route_path('pullrequest_create', repo_name=source.repo_name),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
('common_ancestor', commit_ids['ancestor']),
('pullrequest_title', 'Title'),
('pullrequest_desc', 'Description'),
('description_renderer', 'markdown'),
('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '1'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
('__start__', 'rules:sequence'),
('__end__', 'rules:sequence'),
('mandatory', 'False'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('__start__', 'revisions:sequence'),
('revisions', commit_ids['change']),
('__end__', 'revisions:sequence'),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = location.rsplit('/', 1)[1]
assert pull_request_id != 'new'
pull_request = PullRequest.get(int(pull_request_id))
# target_ref has to point to the ancestor's commit_id in order to
# show the correct diff
expected_target_ref = 'branch:default:' + commit_ids['ancestor']
assert pull_request.target_ref == expected_target_ref
# Check generated diff contents
response = response.follow()
assert 'content_of_ancestor' not in response.body
assert 'content_of_ancestor-child' not in response.body
assert 'content_of_change' in response.body
def test_merge_pull_request_enabled(self, pr_util, csrf_token):
# Clear any previous calls to rcextensions
rhodecode.EXTENSIONS.calls.clear()
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name,
response = self.app.post(
route_path('pullrequest_merge',
repo_name=str(repo_name[0]),
pull_request_id=pull_request_id),
params={'csrf_token': csrf_token}).follow()
pull_request = PullRequest.get(pull_request_id)
assert response.status_int == 200
assert pull_request.is_closed()
assert_pull_request_status(
pull_request, ChangesetStatus.STATUS_APPROVED)
# Check the relevant log entries were added
user_logs = UserLog.query().order_by('-user_log_id').limit(3)
actions = [log.action for log in user_logs]
pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
expected_actions = [
u'repo.pull_request.close',
u'repo.pull_request.merge',
u'repo.pull_request.comment.create'
]
assert actions == expected_actions
user_logs = UserLog.query().order_by('-user_log_id').limit(4)
actions = [log for log in user_logs]
assert actions[-1].action == 'user.push'
assert actions[-1].action_data['commit_ids'] == pr_commit_ids
# Check post_push rcextension was really executed
push_calls = rhodecode.EXTENSIONS.calls['_push_hook']
assert len(push_calls) == 1
unused_last_call_args, last_call_kwargs = push_calls[0]
assert last_call_kwargs['action'] == 'push'
assert last_call_kwargs['commit_ids'] == pr_commit_ids
def test_merge_pull_request_disabled(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=False)
pull_request_id = pull_request.pull_request_id
pull_request = PullRequest.get(pull_request_id)
response = self.app.post(
route_path('pullrequest_merge',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id),
params={'csrf_token': csrf_token}).follow()
assert response.status_int == 200
response.mustcontain(
'Merge is not currently possible because of below failed checks.')
response.mustcontain('Server-side pull request merging is disabled.')
@pytest.mark.skip_backends('svn')
def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
response = self.app.post(
route_path('pullrequest_merge',
repo_name=repo_name,
pull_request_id=pull_request_id),
params={'csrf_token': csrf_token}).follow()
assert response.status_int == 200
response.mustcontain(
'Merge is not currently possible because of below failed checks.')
response.mustcontain('Pull request reviewer approval is pending.')
def test_merge_pull_request_renders_failure_reason(
self, user_regular, csrf_token, pr_util):
pull_request = pr_util.create_pull_request(mergeable=True, approved=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
model_patcher = mock.patch.multiple(
PullRequestModel,
merge_repo=mock.Mock(return_value=MergeResponse(
True, False, 'STUB_COMMIT_ID', MergeFailureReason.PUSH_FAILED)),
merge_status=mock.Mock(return_value=(True, 'WRONG_MESSAGE')))
with model_patcher:
response = self.app.post(
route_path('pullrequest_merge',
repo_name=repo_name,
pull_request_id=pull_request_id),
params={'csrf_token': csrf_token}, status=302)
assert_session_flash(response, PullRequestModel.MERGE_STATUS_MESSAGES[
MergeFailureReason.PUSH_FAILED])
def test_update_source_revision(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name, commit_id=commit_ids['change'])
pull_request.target_repo = target
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# source has ancestor - change - change-2
backend.pull_heads(source, heads=['change-2'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token})
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [
commit_ids['change-2'], commit_ids['change']]
# TODO: johbo: this should be a test on its own
response = self.app.get(route_path(
'pullrequest_new',
repo_name=target.repo_name))
assert response.status_int == 200
assert 'Pull request updated to' in response.body
assert 'with 1 added, 0 removed commits.' in response.body
def test_update_target_revision(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'ancestor-new', 'parents': ['ancestor']},
{'message': 'change-rebased'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name, commit_id=commit_ids['change'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target has ancestor - ancestor-new
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(target, heads=['ancestor-new'])
backend.pull_heads(source, heads=['change-rebased'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token},
status=200)
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids['change-rebased']]
assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor-new'])
# TODO: johbo: This should be a test on its own
response = self.app.get(route_path(
'pullrequest_new',
repo_name=target.repo_name))
assert response.status_int == 200
assert 'Pull request updated to' in response.body
assert 'with 1 added, 1 removed commits.' in response.body
def test_update_target_revision_with_removal_of_1_commit_git(self, backend_git, csrf_token):
backend = backend_git
commits = [
{'message': 'master-commit-1'},
{'message': 'master-commit-2-change-1'},
{'message': 'master-commit-3-change-2'},
{'message': 'feat-commit-1', 'parents': ['master-commit-1']},
{'message': 'feat-commit-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['master-commit-3-change-2'])
source = backend.create_repo(heads=['feat-commit-2'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['master-commit-3-change-2'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['feat-commit-2'])
pull_request.revisions = [
commit_ids['feat-commit-1'],
commit_ids['feat-commit-2']
]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# PR is created, now we simulate a force-push into target,
# that drops a 2 last commits
vcsrepo = target.scm_instance()
vcsrepo.config.clear_section('hooks')
vcsrepo.run_git_command(['reset', '--soft', 'HEAD~2'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token},
status=200)
response = self.app.get(route_path(
'pullrequest_new',
repo_name=target.repo_name))
assert response.status_int == 200
response.mustcontain('Pull request updated to')
response.mustcontain('with 0 added, 0 removed commits.')
def test_update_of_ancestor_reference(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change-2'},
{'message': 'ancestor-new', 'parents': ['ancestor']},
{'message': 'change-rebased'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['change'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target has ancestor - ancestor-new
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(target, heads=['ancestor-new'])
backend.pull_heads(source, heads=['change-rebased'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token},
status=200)
# Expect the target reference to be updated correctly
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids['change-rebased']]
expected_target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor-new'])
assert pull_request.target_ref == expected_target_ref
def test_remove_pull_request_branch(self, backend_git, csrf_token):
branch_name = 'development'
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'branch': branch_name},
]
repo = backend_git.create_repo(commits)
commit_ids = backend_git.commit_ids
pull_request = PullRequest()
pull_request.source_repo = repo
pull_request.target_repo = repo
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=branch_name, commit_id=commit_ids['new-feature'])
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend_git.default_branch_name,
commit_id=commit_ids['old-feature'])
pull_request.revisions = [commit_ids['new-feature']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
vcs = repo.scm_instance()
vcs.remove_ref('refs/heads/{}'.format(branch_name))
response = self.app.get(route_path(
'pullrequest_show',
repo_name=repo.repo_name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
assert_response.element_contains(
'#changeset_compare_view_content .alert',
'This pull request cannot be displayed, because one or more'
' commits no longer exist in the source repository.')
def test_strip_commits_from_pull_request(
self, backend, pr_util, csrf_token):
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'parents': ['initial-commit']},
]
pull_request = pr_util.create_pull_request(
commits, target_head='initial-commit', source_head='new-feature',
revisions=['new-feature'])
vcs = pr_util.source_repository.scm_instance()
if backend.alias == 'git':
vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
else:
vcs.strip(pr_util.commit_ids['new-feature'])
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pr_util.target_repository.repo_name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
assert_response.element_contains(
'#changeset_compare_view_content .alert',
'This pull request cannot be displayed, because one or more'
' commits no longer exist in the source repository.')
assert_response.element_contains(
'#update_commits',
'Update commits')
def test_strip_commits_and_update(
self, backend, pr_util, csrf_token):
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'parents': ['old-feature']},
]
pull_request = pr_util.create_pull_request(
commits, target_head='old-feature', source_head='new-feature',
revisions=['new-feature'], mergeable=True)
vcs = pr_util.source_repository.scm_instance()
if backend.alias == 'git':
vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
else:
vcs.strip(pr_util.commit_ids['new-feature'])
response = self.app.post(
route_path('pullrequest_update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request.pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token})
assert response.status_int == 200
assert response.body == 'true'
# Make sure that after update, it won't raise 500 errors
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pr_util.target_repository.repo_name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
def test_branch_is_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:origin:1234567890abcdef'
pull_request.target_ref = 'branch:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
origin_children = origin.getchildren()
assert len(origin_children) == 1
target = assert_response.get_element('.pr-targetinfo .tag')
target_children = target.getchildren()
assert len(target_children) == 1
expected_origin_link = route_path(
'repo_changelog',
repo_name=pull_request.source_repo.scm_instance().name,
params=dict(branch='origin'))
expected_target_link = route_path(
'repo_changelog',
repo_name=pull_request.target_repo.scm_instance().name,
params=dict(branch='target'))
assert origin_children[0].attrib['href'] == expected_origin_link
assert origin_children[0].text == 'branch: origin'
assert target_children[0].attrib['href'] == expected_target_link
assert target_children[0].text == 'branch: target'
def test_bookmark_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
pull_request.target_ref = 'bookmark:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
assert origin.text.strip() == 'bookmark: origin'
assert origin.getchildren() == []
target = assert_response.get_element('.pr-targetinfo .tag')
assert target.text.strip() == 'bookmark: target'
assert target.getchildren() == []
def test_tag_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'tag:origin:1234567890abcdef'
pull_request.target_ref = 'tag:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
assert origin.text.strip() == 'tag: origin'
assert origin.getchildren() == []
target = assert_response.get_element('.pr-targetinfo .tag')
assert target.text.strip() == 'tag: target'
assert target.getchildren() == []
@pytest.mark.parametrize('mergeable', [True, False])
def test_shadow_repository_link(
self, mergeable, pr_util, http_host_only_stub):
"""
Check that the pull request summary page displays a link to the shadow
repository if the pull request is mergeable. If it is not mergeable
the link should not be displayed.
"""
pull_request = pr_util.create_pull_request(
mergeable=mergeable, enable_notifications=False)
target_repo = pull_request.target_repo.scm_instance()
pr_id = pull_request.pull_request_id
shadow_url = '{host}/{repo}/pull-request/{pr_id}/repository'.format(
host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=target_repo.name,
pull_request_id=pr_id))
assertr = AssertResponse(response)
if mergeable:
assertr.element_value_contains('input.pr-mergeinfo', shadow_url)
assertr.element_value_contains('input.pr-mergeinfo ', 'pr-merge')
else:
assertr.no_element_exists('.pr-mergeinfo')
@pytest.mark.usefixtures('app')
@pytest.mark.backends("git", "hg")
class TestPullrequestsControllerDelete(object):
def test_pull_request_delete_button_permissions_admin(
self, autologin_user, user_admin, pr_util):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('id="delete_pullrequest"')
response.mustcontain('Confirm to delete this pull request')
def test_pull_request_delete_button_permissions_owner(
self, autologin_regular_user, user_regular, pr_util):
pull_request = pr_util.create_pull_request(
author=user_regular.username, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('id="delete_pullrequest"')
response.mustcontain('Confirm to delete this pull request')
def test_pull_request_delete_button_permissions_forbidden(
self, autologin_regular_user, user_regular, user_admin, pr_util):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain(no=['id="delete_pullrequest"'])
response.mustcontain(no=['Confirm to delete this pull request'])
def test_pull_request_delete_button_permissions_can_update_cannot_delete(
self, autologin_regular_user, user_regular, user_admin, pr_util,
user_util):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
user_util.grant_user_permission_to_repo(
pull_request.target_repo, user_regular,
'repository.write')
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('id="open_edit_pullrequest"')
response.mustcontain('id="delete_pullrequest"')
response.mustcontain(no=['Confirm to delete this pull request'])
def test_delete_comment_returns_404_if_comment_does_not_exist(
self, autologin_user, pr_util, user_admin, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
self.app.post(
route_path(
'pullrequest_comment_delete',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=1024404),
extra_environ=xhr_header,
params={'csrf_token': csrf_token},
status=404
)
def test_delete_comment(
self, autologin_user, pr_util, user_admin, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
comment = pr_util.create_comment()
comment_id = comment.comment_id
response = self.app.post(
route_path(
'pullrequest_comment_delete',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id),
extra_environ=xhr_header,
params={'csrf_token': csrf_token},
status=200
)
assert response.body == 'true'
@pytest.mark.parametrize('url_type', [
'pullrequest_new',
'pullrequest_create',
'pullrequest_update',
'pullrequest_merge',
])
def test_pull_request_is_forbidden_on_archived_repo(
self, autologin_user, backend, xhr_header, user_util, url_type):
# create a temporary repo
source = user_util.create_repo(repo_type=backend.alias)
repo_name = source.repo_name
repo = Repository.get_by_repo_name(repo_name)
repo.archived = True
Session().commit()
response = self.app.get(
route_path(url_type, repo_name=repo_name, pull_request_id=1), status=302)
msg = 'Action not supported for archived repository.'
assert_session_flash(response, msg)
def assert_pull_request_status(pull_request, expected_status):
status = ChangesetStatusModel().calculated_review_status(
pull_request=pull_request)
assert status == expected_status
@pytest.mark.parametrize('route', ['pullrequest_new', 'pullrequest_create'])
@pytest.mark.usefixtures("autologin_user")
def test_forbidde_to_repo_summary_for_svn_repositories(backend_svn, app, route):
response = app.get(
route_path(route, repo_name=backend_svn.repo_name), status=404)