##// END OF EJS Templates
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking...
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking for the LRU implementation, this it's safer to use for dogpile. We used it before with beaker, so it's generally more stable.

File last commit:

r2903:6d16d1cd default
r2945:ec5716e4 default
Show More
test_repo_pullrequests.py
1206 lines | 50.4 KiB | text/x-python | PythonLexer
/ rhodecode / apps / repository / tests / test_repo_pullrequests.py
pull-requests: migrated code from pylons to pyramid
r1974 # -*- coding: utf-8 -*-
release: update copyright year to 2018
r2487 # Copyright (C) 2010-2018 RhodeCode GmbH
pull-requests: migrated code from pylons to pyramid
r1974 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import mock
import pytest
import rhodecode
from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.lib import helpers as h
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import (
PullRequest, ChangesetStatus, UserLog, Notification, ChangesetComment)
from rhodecode.model.meta import Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.tests import (
assert_session_flash, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN)
from rhodecode.tests.utils import AssertResponse
def route_path(name, params=None, **kwargs):
import urllib
base_url = {
'repo_changelog': '/{repo_name}/changelog',
'repo_changelog_file': '/{repo_name}/changelog/{commit_id}/{f_path}',
'pullrequest_show': '/{repo_name}/pull-request/{pull_request_id}',
'pullrequest_show_all': '/{repo_name}/pull-request',
'pullrequest_show_all_data': '/{repo_name}/pull-request-data',
'pullrequest_repo_refs': '/{repo_name}/pull-request/refs/{target_repo_name:.*?[^/]}',
'pullrequest_repo_destinations': '/{repo_name}/pull-request/repo-destinations',
'pullrequest_new': '/{repo_name}/pull-request/new',
'pullrequest_create': '/{repo_name}/pull-request/create',
'pullrequest_update': '/{repo_name}/pull-request/{pull_request_id}/update',
'pullrequest_merge': '/{repo_name}/pull-request/{pull_request_id}/merge',
'pullrequest_delete': '/{repo_name}/pull-request/{pull_request_id}/delete',
'pullrequest_comment_create': '/{repo_name}/pull-request/{pull_request_id}/comment',
'pullrequest_comment_delete': '/{repo_name}/pull-request/{pull_request_id}/comment/{comment_id}/delete',
}[name].format(**kwargs)
if params:
base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
return base_url
@pytest.mark.usefixtures('app', 'autologin_user')
@pytest.mark.backends("git", "hg")
class TestPullrequestsView(object):
def test_index(self, backend):
self.app.get(route_path(
'pullrequest_new',
repo_name=backend.repo_name))
def test_option_menu_create_pull_request_exists(self, backend):
repo_name = backend.repo_name
response = self.app.get(h.route_path('repo_summary', repo_name=repo_name))
create_pr_link = '<a href="%s">Create Pull Request</a>' % route_path(
'pullrequest_new', repo_name=repo_name)
response.mustcontain(create_pr_link)
def test_create_pr_form_with_raw_commit_id(self, backend):
repo = backend.repo
self.app.get(
route_path('pullrequest_new',
repo_name=repo.repo_name,
commit=repo.get_commit().raw_id),
status=200)
@pytest.mark.parametrize('pr_merge_enabled', [True, False])
def test_show(self, pr_util, pr_merge_enabled):
pull_request = pr_util.create_pull_request(
mergeable=pr_merge_enabled, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
assert pull_request.target_ref_parts.type in response
assert pull_request.target_ref_parts.name in response
target_clone_url = pull_request.target_repo.clone_url()
assert target_clone_url in response
assert 'class="pull-request-merge"' in response
assert (
'Server-side pull request merging is disabled.'
in response) != pr_merge_enabled
def test_close_status_visibility(self, pr_util, user_util, csrf_token):
# Logout
response = self.app.post(
h.route_path('logout'),
params={'csrf_token': csrf_token})
# Login as regular user
response = self.app.post(h.route_path('login'),
{'username': TEST_USER_REGULAR_LOGIN,
'password': 'test12'})
pull_request = pr_util.create_pull_request(
author=TEST_USER_REGULAR_LOGIN)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('Server-side pull request merging is disabled.')
assert_response = response.assert_response()
# for regular user without a merge permissions, we don't see it
assert_response.no_element_exists('#close-pull-request-action')
user_util.grant_user_permission_to_repo(
pull_request.target_repo,
UserModel().get_by_username(TEST_USER_REGULAR_LOGIN),
'repository.write')
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('Server-side pull request merging is disabled.')
assert_response = response.assert_response()
# now regular user has a merge permissions, we have CLOSE button
assert_response.one_element_exists('#close-pull-request-action')
def test_show_invalid_commit_id(self, pr_util):
# Simulating invalid revisions which will cause a lookup error
pull_request = pr_util.create_pull_request()
pull_request.revisions = ['invalid']
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
def test_show_invalid_source_reference(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:b:invalid'
Session().add(pull_request)
Session().commit()
self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
def test_edit_title_description(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path('pullrequest_update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id),
params={
'edit_pull_request': 'true',
'title': 'New title',
'description': 'New description',
'csrf_token': csrf_token})
assert_session_flash(
response, u'Pull request title & description updated.',
category='success')
pull_request = PullRequest.get(pull_request_id)
assert pull_request.title == 'New title'
assert pull_request.description == 'New description'
def test_edit_title_description_closed(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
pull-requests: forbid doing any changes on closed pull-requests....
r2383 repo_name = pull_request.target_repo.repo_name
pull-requests: migrated code from pylons to pyramid
r1974 pr_util.close()
response = self.app.post(
route_path('pullrequest_update',
pull-requests: forbid doing any changes on closed pull-requests....
r2383 repo_name=repo_name, pull_request_id=pull_request_id),
pull-requests: migrated code from pylons to pyramid
r1974 params={
'edit_pull_request': 'true',
'title': 'New title',
'description': 'New description',
pull-requests: forbid doing any changes on closed pull-requests....
r2383 'csrf_token': csrf_token}, status=200)
pull-requests: migrated code from pylons to pyramid
r1974 assert_session_flash(
response, u'Cannot update closed pull requests.',
category='error')
def test_update_invalid_source_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import UpdateFailureReason
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path('pullrequest_update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token})
pull-requests: trigger merge simulation during PR creation. Fixes #5396
r2168 expected_msg = str(PullRequestModel.UPDATE_STATUS_MESSAGES[
UpdateFailureReason.MISSING_SOURCE_REF])
pull-requests: migrated code from pylons to pyramid
r1974 assert_session_flash(response, expected_msg, category='error')
def test_missing_target_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import MergeFailureReason
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
pull_request.target_ref = 'branch:invalid-branch:invalid-commit-id'
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
pull_request_url = route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request_id)
response = self.app.get(pull_request_url)
assertr = AssertResponse(response)
expected_msg = PullRequestModel.MERGE_STATUS_MESSAGES[
MergeFailureReason.MISSING_TARGET_REF]
assertr.element_contains(
'span[data-role="merge-message"]', str(expected_msg))
def test_comment_and_close_pull_request_custom_message_approved(
self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request(approved=True)
pull_request_id = pull_request.pull_request_id
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
route_path('pullrequest_comment_create',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request_id),
params={
'close_pull_request': '1',
'text': 'Closing a PR',
'csrf_token': csrf_token},
extra_environ=xhr_header,)
journal = UserLog.query()\
.filter(UserLog.user_id == author)\
.filter(UserLog.repository_id == repo) \
.order_by('user_log_id') \
.all()
assert journal[-1].action == 'repo.pull_request.close'
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_APPROVED
comments = ChangesetComment().query() \
.filter(ChangesetComment.pull_request == pull_request) \
.order_by(ChangesetComment.comment_id.asc())\
.all()
assert comments[-1].text == 'Closing a PR'
def test_comment_force_close_pull_request_rejected(
self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
PullRequestModel().update_reviewers(
default-reviewers: introduce new voting rule logic that allows...
r2484 pull_request_id, [(1, ['reason'], False, []), (2, ['reason2'], False, [])],
pull-requests: migrated code from pylons to pyramid
r1974 pull_request.author)
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
route_path('pullrequest_comment_create',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request_id),
params={
'close_pull_request': '1',
'csrf_token': csrf_token},
extra_environ=xhr_header)
pull_request = PullRequest.get(pull_request_id)
journal = UserLog.query()\
.filter(UserLog.user_id == author, UserLog.repository_id == repo) \
.order_by('user_log_id') \
.all()
assert journal[-1].action == 'repo.pull_request.close'
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_REJECTED
def test_comment_and_close_pull_request(
self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
route_path('pullrequest_comment_create',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id),
params={
'close_pull_request': 'true',
'csrf_token': csrf_token},
extra_environ=xhr_header)
assert response.json
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
pull_request.source_repo, pull_request=pull_request)
assert status == ChangesetStatus.STATUS_REJECTED
def test_create_pull_request(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change2'])
response = self.app.post(
route_path('pullrequest_create', repo_name=source.repo_name),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change2']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor']),
('common_ancestor', commit_ids['ancestor']),
pull-requests: make the renderer stored and saved for each pull requests....
r2903 ('pullrequest_title', 'Title'),
pull-requests: migrated code from pylons to pyramid
r1974 ('pullrequest_desc', 'Description'),
pull-requests: make the renderer stored and saved for each pull requests....
r2903 ('description_renderer', 'markdown'),
pull-requests: migrated code from pylons to pyramid
r1974 ('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '1'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
default-reviewers: introduce new voting rule logic that allows...
r2484 ('__start__', 'rules:sequence'),
('__end__', 'rules:sequence'),
pull-requests: migrated code from pylons to pyramid
r1974 ('mandatory', 'False'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('__start__', 'revisions:sequence'),
('revisions', commit_ids['change']),
('revisions', commit_ids['change2']),
('__end__', 'revisions:sequence'),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = location.rsplit('/', 1)[1]
assert pull_request_id != 'new'
pull_request = PullRequest.get(int(pull_request_id))
# check that we have now both revisions
assert pull_request.revisions == [commit_ids['change2'], commit_ids['change']]
assert pull_request.source_ref == 'branch:default:' + commit_ids['change2']
expected_target_ref = 'branch:default:' + commit_ids['ancestor']
assert pull_request.target_ref == expected_target_ref
def test_reviewer_notifications(self, backend, csrf_token):
# We have to use the app.post for this test so it will create the
# notifications properly with the new PR
commits = [
{'message': 'ancestor',
'added': [FileNode('file_A', content='content_of_ancestor')]},
{'message': 'change',
'added': [FileNode('file_a', content='content_of_change')]},
{'message': 'change-child'},
{'message': 'ancestor-child', 'parents': ['ancestor'],
'added': [
FileNode('file_B', content='content_of_ancestor_child')]},
{'message': 'ancestor-child-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor-child'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
route_path('pullrequest_create', repo_name=source.repo_name),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
('common_ancestor', commit_ids['ancestor']),
pull-requests: make the renderer stored and saved for each pull requests....
r2903 ('pullrequest_title', 'Title'),
pull-requests: migrated code from pylons to pyramid
r1974 ('pullrequest_desc', 'Description'),
pull-requests: make the renderer stored and saved for each pull requests....
r2903 ('description_renderer', 'markdown'),
pull-requests: migrated code from pylons to pyramid
r1974 ('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '2'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
default-reviewers: introduce new voting rule logic that allows...
r2484 ('__start__', 'rules:sequence'),
('__end__', 'rules:sequence'),
pull-requests: migrated code from pylons to pyramid
r1974 ('mandatory', 'False'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('__start__', 'revisions:sequence'),
('revisions', commit_ids['change']),
('__end__', 'revisions:sequence'),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = location.rsplit('/', 1)[1]
assert pull_request_id != 'new'
pull_request = PullRequest.get(int(pull_request_id))
# Check that a notification was made
notifications = Notification.query()\
.filter(Notification.created_by == pull_request.author.user_id,
Notification.type_ == Notification.TYPE_PULL_REQUEST,
Notification.subject.contains(
"wants you to review pull request #%s" % pull_request_id))
assert len(notifications.all()) == 1
# Change reviewers and check that a notification was made
PullRequestModel().update_reviewers(
default-reviewers: introduce new voting rule logic that allows...
r2484 pull_request.pull_request_id, [(1, [], False, [])],
pull-requests: migrated code from pylons to pyramid
r1974 pull_request.author)
assert len(notifications.all()) == 2
def test_create_pull_request_stores_ancestor_commit_id(self, backend,
csrf_token):
commits = [
{'message': 'ancestor',
'added': [FileNode('file_A', content='content_of_ancestor')]},
{'message': 'change',
'added': [FileNode('file_a', content='content_of_change')]},
{'message': 'change-child'},
{'message': 'ancestor-child', 'parents': ['ancestor'],
'added': [
FileNode('file_B', content='content_of_ancestor_child')]},
{'message': 'ancestor-child-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor-child'])
source = backend.create_repo(heads=['change'])
response = self.app.post(
route_path('pullrequest_create', repo_name=source.repo_name),
[
('source_repo', source.repo_name),
('source_ref', 'branch:default:' + commit_ids['change']),
('target_repo', target.repo_name),
('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
('common_ancestor', commit_ids['ancestor']),
pull-requests: make the renderer stored and saved for each pull requests....
r2903 ('pullrequest_title', 'Title'),
pull-requests: migrated code from pylons to pyramid
r1974 ('pullrequest_desc', 'Description'),
pull-requests: make the renderer stored and saved for each pull requests....
r2903 ('description_renderer', 'markdown'),
pull-requests: migrated code from pylons to pyramid
r1974 ('__start__', 'review_members:sequence'),
('__start__', 'reviewer:mapping'),
('user_id', '1'),
('__start__', 'reasons:sequence'),
('reason', 'Some reason'),
('__end__', 'reasons:sequence'),
default-reviewers: introduce new voting rule logic that allows...
r2484 ('__start__', 'rules:sequence'),
('__end__', 'rules:sequence'),
pull-requests: migrated code from pylons to pyramid
r1974 ('mandatory', 'False'),
('__end__', 'reviewer:mapping'),
('__end__', 'review_members:sequence'),
('__start__', 'revisions:sequence'),
('revisions', commit_ids['change']),
('__end__', 'revisions:sequence'),
('user', ''),
('csrf_token', csrf_token),
],
status=302)
location = response.headers['Location']
pull_request_id = location.rsplit('/', 1)[1]
assert pull_request_id != 'new'
pull_request = PullRequest.get(int(pull_request_id))
# target_ref has to point to the ancestor's commit_id in order to
# show the correct diff
expected_target_ref = 'branch:default:' + commit_ids['ancestor']
assert pull_request.target_ref == expected_target_ref
# Check generated diff contents
response = response.follow()
assert 'content_of_ancestor' not in response.body
assert 'content_of_ancestor-child' not in response.body
assert 'content_of_change' in response.body
def test_merge_pull_request_enabled(self, pr_util, csrf_token):
# Clear any previous calls to rcextensions
rhodecode.EXTENSIONS.calls.clear()
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name,
response = self.app.post(
route_path('pullrequest_merge',
repo_name=str(repo_name[0]),
pull_request_id=pull_request_id),
params={'csrf_token': csrf_token}).follow()
pull_request = PullRequest.get(pull_request_id)
assert response.status_int == 200
assert pull_request.is_closed()
assert_pull_request_status(
pull_request, ChangesetStatus.STATUS_APPROVED)
# Check the relevant log entries were added
user_logs = UserLog.query().order_by('-user_log_id').limit(3)
actions = [log.action for log in user_logs]
pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
expected_actions = [
u'repo.pull_request.close',
u'repo.pull_request.merge',
u'repo.pull_request.comment.create'
]
assert actions == expected_actions
user_logs = UserLog.query().order_by('-user_log_id').limit(4)
actions = [log for log in user_logs]
assert actions[-1].action == 'user.push'
assert actions[-1].action_data['commit_ids'] == pr_commit_ids
# Check post_push rcextension was really executed
push_calls = rhodecode.EXTENSIONS.calls['post_push']
assert len(push_calls) == 1
unused_last_call_args, last_call_kwargs = push_calls[0]
assert last_call_kwargs['action'] == 'push'
assert last_call_kwargs['pushed_revs'] == pr_commit_ids
def test_merge_pull_request_disabled(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=False)
pull_request_id = pull_request.pull_request_id
pull_request = PullRequest.get(pull_request_id)
response = self.app.post(
route_path('pullrequest_merge',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id),
params={'csrf_token': csrf_token}).follow()
assert response.status_int == 200
response.mustcontain(
'Merge is not currently possible because of below failed checks.')
response.mustcontain('Server-side pull request merging is disabled.')
@pytest.mark.skip_backends('svn')
def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
response = self.app.post(
route_path('pullrequest_merge',
repo_name=repo_name,
pull_request_id=pull_request_id),
params={'csrf_token': csrf_token}).follow()
assert response.status_int == 200
response.mustcontain(
'Merge is not currently possible because of below failed checks.')
response.mustcontain('Pull request reviewer approval is pending.')
def test_merge_pull_request_renders_failure_reason(
self, user_regular, csrf_token, pr_util):
pull_request = pr_util.create_pull_request(mergeable=True, approved=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
model_patcher = mock.patch.multiple(
PullRequestModel,
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 merge_repo=mock.Mock(return_value=MergeResponse(
pull-requests: migrated code from pylons to pyramid
r1974 True, False, 'STUB_COMMIT_ID', MergeFailureReason.PUSH_FAILED)),
merge_status=mock.Mock(return_value=(True, 'WRONG_MESSAGE')))
with model_patcher:
response = self.app.post(
route_path('pullrequest_merge',
repo_name=repo_name,
pull_request_id=pull_request_id),
params={'csrf_token': csrf_token}, status=302)
assert_session_flash(response, PullRequestModel.MERGE_STATUS_MESSAGES[
MergeFailureReason.PUSH_FAILED])
def test_update_source_revision(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name, commit_id=commit_ids['change'])
pull_request.target_repo = target
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# source has ancestor - change - change-2
backend.pull_heads(source, heads=['change-2'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token})
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [
commit_ids['change-2'], commit_ids['change']]
# TODO: johbo: this should be a test on its own
response = self.app.get(route_path(
'pullrequest_new',
repo_name=target.repo_name))
assert response.status_int == 200
assert 'Pull request updated to' in response.body
assert 'with 1 added, 0 removed commits.' in response.body
def test_update_target_revision(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'ancestor-new', 'parents': ['ancestor']},
{'message': 'change-rebased'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name, commit_id=commit_ids['change'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target has ancestor - ancestor-new
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(target, heads=['ancestor-new'])
backend.pull_heads(source, heads=['change-rebased'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token},
status=200)
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids['change-rebased']]
assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor-new'])
# TODO: johbo: This should be a test on its own
response = self.app.get(route_path(
'pullrequest_new',
repo_name=target.repo_name))
assert response.status_int == 200
assert 'Pull request updated to' in response.body
assert 'with 1 added, 1 removed commits.' in response.body
git: use force fetch and update for target ref. This solves a case...
r2784 def test_update_target_revision_with_removal_of_1_commit_git(self, backend_git, csrf_token):
backend = backend_git
commits = [
{'message': 'master-commit-1'},
{'message': 'master-commit-2-change-1'},
{'message': 'master-commit-3-change-2'},
{'message': 'feat-commit-1', 'parents': ['master-commit-1']},
{'message': 'feat-commit-2'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['master-commit-3-change-2'])
source = backend.create_repo(heads=['feat-commit-2'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['master-commit-3-change-2'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['feat-commit-2'])
pull_request.revisions = [
commit_ids['feat-commit-1'],
commit_ids['feat-commit-2']
]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# PR is created, now we simulate a force-push into target,
# that drops a 2 last commits
vcsrepo = target.scm_instance()
vcsrepo.config.clear_section('hooks')
vcsrepo.run_git_command(['reset', '--soft', 'HEAD~2'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token},
status=200)
response = self.app.get(route_path(
'pullrequest_new',
repo_name=target.repo_name))
assert response.status_int == 200
response.mustcontain('Pull request updated to')
response.mustcontain('with 0 added, 0 removed commits.')
pull-requests: migrated code from pylons to pyramid
r1974 def test_update_of_ancestor_reference(self, backend, csrf_token):
commits = [
{'message': 'ancestor'},
{'message': 'change'},
{'message': 'change-2'},
{'message': 'ancestor-new', 'parents': ['ancestor']},
{'message': 'change-rebased'},
]
commit_ids = backend.create_master_repo(commits)
target = backend.create_repo(heads=['ancestor'])
source = backend.create_repo(heads=['change'])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
# TODO: johbo: Make sure that we write the source ref this way!
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['change'])
pull_request.target_repo = target
# TODO: johbo: Target ref should be branch based, since tip can jump
# from branch to branch
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor'])
pull_request.revisions = [commit_ids['change']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
# target has ancestor - ancestor-new
# source has ancestor - ancestor-new - change-rebased
backend.pull_heads(target, heads=['ancestor-new'])
backend.pull_heads(source, heads=['change-rebased'])
# update PR
self.app.post(
route_path('pullrequest_update',
repo_name=target.repo_name,
pull_request_id=pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token},
status=200)
# Expect the target reference to be updated correctly
pull_request = PullRequest.get(pull_request_id)
assert pull_request.revisions == [commit_ids['change-rebased']]
expected_target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend.default_branch_name,
commit_id=commit_ids['ancestor-new'])
assert pull_request.target_ref == expected_target_ref
def test_remove_pull_request_branch(self, backend_git, csrf_token):
branch_name = 'development'
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'branch': branch_name},
]
repo = backend_git.create_repo(commits)
commit_ids = backend_git.commit_ids
pull_request = PullRequest()
pull_request.source_repo = repo
pull_request.target_repo = repo
pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
branch=branch_name, commit_id=commit_ids['new-feature'])
pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
branch=backend_git.default_branch_name,
commit_id=commit_ids['old-feature'])
pull_request.revisions = [commit_ids['new-feature']]
pull_request.title = u"Test"
pull_request.description = u"Description"
pull_request.author = UserModel().get_by_username(
TEST_USER_ADMIN_LOGIN)
Session().add(pull_request)
Session().commit()
vcs = repo.scm_instance()
vcs.remove_ref('refs/heads/{}'.format(branch_name))
response = self.app.get(route_path(
'pullrequest_show',
repo_name=repo.repo_name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
assert_response.element_contains(
'#changeset_compare_view_content .alert',
'This pull request cannot be displayed, because one or more'
' commits no longer exist in the source repository.')
def test_strip_commits_from_pull_request(
self, backend, pr_util, csrf_token):
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'parents': ['initial-commit']},
]
pull_request = pr_util.create_pull_request(
commits, target_head='initial-commit', source_head='new-feature',
revisions=['new-feature'])
vcs = pr_util.source_repository.scm_instance()
if backend.alias == 'git':
vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
else:
vcs.strip(pr_util.commit_ids['new-feature'])
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pr_util.target_repository.repo_name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
assert_response.element_contains(
'#changeset_compare_view_content .alert',
'This pull request cannot be displayed, because one or more'
' commits no longer exist in the source repository.')
assert_response.element_contains(
'#update_commits',
'Update commits')
def test_strip_commits_and_update(
self, backend, pr_util, csrf_token):
commits = [
{'message': 'initial-commit'},
{'message': 'old-feature'},
{'message': 'new-feature', 'parents': ['old-feature']},
]
pull_request = pr_util.create_pull_request(
commits, target_head='old-feature', source_head='new-feature',
revisions=['new-feature'], mergeable=True)
vcs = pr_util.source_repository.scm_instance()
if backend.alias == 'git':
vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
else:
vcs.strip(pr_util.commit_ids['new-feature'])
response = self.app.post(
route_path('pullrequest_update',
repo_name=pull_request.target_repo.repo_name,
pull_request_id=pull_request.pull_request_id),
params={'update_commits': 'true',
'csrf_token': csrf_token})
assert response.status_int == 200
assert response.body == 'true'
# Make sure that after update, it won't raise 500 errors
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pr_util.target_repository.repo_name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
assert_response.element_contains(
'#changeset_compare_view_content .alert strong',
'Missing commits')
def test_branch_is_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'branch:origin:1234567890abcdef'
pull_request.target_ref = 'branch:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
origin_children = origin.getchildren()
assert len(origin_children) == 1
target = assert_response.get_element('.pr-targetinfo .tag')
target_children = target.getchildren()
assert len(target_children) == 1
expected_origin_link = route_path(
'repo_changelog',
repo_name=pull_request.source_repo.scm_instance().name,
params=dict(branch='origin'))
expected_target_link = route_path(
'repo_changelog',
repo_name=pull_request.target_repo.scm_instance().name,
params=dict(branch='target'))
assert origin_children[0].attrib['href'] == expected_origin_link
assert origin_children[0].text == 'branch: origin'
assert target_children[0].attrib['href'] == expected_target_link
assert target_children[0].text == 'branch: target'
def test_bookmark_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
pull_request.target_ref = 'bookmark:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
assert origin.text.strip() == 'bookmark: origin'
assert origin.getchildren() == []
target = assert_response.get_element('.pr-targetinfo .tag')
assert target.text.strip() == 'bookmark: target'
assert target.getchildren() == []
def test_tag_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
pull_request.source_ref = 'tag:origin:1234567890abcdef'
pull_request.target_ref = 'tag:target:abcdef1234567890'
Session().add(pull_request)
Session().commit()
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
assert response.status_int == 200
assert_response = AssertResponse(response)
origin = assert_response.get_element('.pr-origininfo .tag')
assert origin.text.strip() == 'tag: origin'
assert origin.getchildren() == []
target = assert_response.get_element('.pr-targetinfo .tag')
assert target.text.strip() == 'tag: target'
assert target.getchildren() == []
@pytest.mark.parametrize('mergeable', [True, False])
def test_shadow_repository_link(
self, mergeable, pr_util, http_host_only_stub):
"""
Check that the pull request summary page displays a link to the shadow
repository if the pull request is mergeable. If it is not mergeable
the link should not be displayed.
"""
pull_request = pr_util.create_pull_request(
mergeable=mergeable, enable_notifications=False)
target_repo = pull_request.target_repo.scm_instance()
pr_id = pull_request.pull_request_id
shadow_url = '{host}/{repo}/pull-request/{pr_id}/repository'.format(
host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=target_repo.name,
pull_request_id=pr_id))
assertr = AssertResponse(response)
if mergeable:
pull-requests: fixed tests.
r1977 assertr.element_value_contains('input.pr-mergeinfo', shadow_url)
assertr.element_value_contains('input.pr-mergeinfo ', 'pr-merge')
pull-requests: migrated code from pylons to pyramid
r1974 else:
pull-requests: fixed tests.
r1977 assertr.no_element_exists('.pr-mergeinfo')
pull-requests: migrated code from pylons to pyramid
r1974
@pytest.mark.usefixtures('app')
@pytest.mark.backends("git", "hg")
class TestPullrequestsControllerDelete(object):
def test_pull_request_delete_button_permissions_admin(
self, autologin_user, user_admin, pr_util):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('id="delete_pullrequest"')
response.mustcontain('Confirm to delete this pull request')
def test_pull_request_delete_button_permissions_owner(
self, autologin_regular_user, user_regular, pr_util):
pull_request = pr_util.create_pull_request(
author=user_regular.username, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('id="delete_pullrequest"')
response.mustcontain('Confirm to delete this pull request')
def test_pull_request_delete_button_permissions_forbidden(
self, autologin_regular_user, user_regular, user_admin, pr_util):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain(no=['id="delete_pullrequest"'])
response.mustcontain(no=['Confirm to delete this pull request'])
def test_pull_request_delete_button_permissions_can_update_cannot_delete(
self, autologin_regular_user, user_regular, user_admin, pr_util,
user_util):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
user_util.grant_user_permission_to_repo(
pull_request.target_repo, user_regular,
'repository.write')
response = self.app.get(route_path(
'pullrequest_show',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id))
response.mustcontain('id="open_edit_pullrequest"')
response.mustcontain('id="delete_pullrequest"')
response.mustcontain(no=['Confirm to delete this pull request'])
def test_delete_comment_returns_404_if_comment_does_not_exist(
pull-requests: fix deletion of comments and make the test actually test this !...
r1978 self, autologin_user, pr_util, user_admin, csrf_token, xhr_header):
pull-requests: migrated code from pylons to pyramid
r1974
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
pull-requests: fix deletion of comments and make the test actually test this !...
r1978 self.app.post(
route_path(
pull-requests: migrated code from pylons to pyramid
r1974 'pullrequest_comment_delete',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
pull-requests: fix deletion of comments and make the test actually test this !...
r1978 comment_id=1024404),
extra_environ=xhr_header,
params={'csrf_token': csrf_token},
status=404
)
def test_delete_comment(
self, autologin_user, pr_util, user_admin, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request(
author=user_admin.username, enable_notifications=False)
comment = pr_util.create_comment()
comment_id = comment.comment_id
response = self.app.post(
route_path(
'pullrequest_comment_delete',
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id),
extra_environ=xhr_header,
params={'csrf_token': csrf_token},
status=200
)
assert response.body == 'true'
pull-requests: migrated code from pylons to pyramid
r1974
def assert_pull_request_status(pull_request, expected_status):
status = ChangesetStatusModel().calculated_review_status(
pull_request=pull_request)
assert status == expected_status
@pytest.mark.parametrize('route', ['pullrequest_new', 'pullrequest_create'])
@pytest.mark.usefixtures("autologin_user")
def test_forbidde_to_repo_summary_for_svn_repositories(backend_svn, app, route):
response = app.get(
route_path(route, repo_name=backend_svn.repo_name), status=404)