# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import pytest from rhodecode.model.db import User from rhodecode.model.pull_request import PullRequestModel from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN from rhodecode.api.tests.utils import build_data, api_call, assert_error @pytest.mark.usefixtures("testuser_api", "app") class TestCreatePullRequestApi(object): finalizers = [] def teardown_method(self, method): if self.finalizers: for finalizer in self.finalizers: finalizer() self.finalizers = [] def test_create_with_wrong_data(self): required_data = { 'source_repo': 'tests/source_repo', 'target_repo': 'tests/target_repo', 'source_ref': 'branch:default:initial', 'target_ref': 'branch:default:new-feature', } for key in required_data: data = required_data.copy() data.pop(key) id_, params = build_data( self.apikey, 'create_pull_request', **data) response = api_call(self.app, params) expected = 'Missing non optional `{}` arg in JSON DATA'.format(key) assert_error(id_, expected, given=response.body) @pytest.mark.backends("git", "hg") @pytest.mark.parametrize('source_ref', [ 'bookmarg:default:initial' ]) def test_create_with_wrong_refs_data(self, backend, source_ref): data = self._prepare_data(backend) data['source_ref'] = source_ref id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected = "Ref `{}` type is not allowed. " \ "Only:['bookmark', 'book', 'tag', 'branch'] " \ "are possible.".format(source_ref) assert_error(id_, expected, given=response.body) @pytest.mark.backends("git", "hg") def test_create_with_correct_data(self, backend): data = self._prepare_data(backend) RepoModel().revoke_user_permission( self.source.repo_name, User.DEFAULT_USER) id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = "Created new pull request `{title}`".format( title=data['title']) result = response.json assert result['error'] is None assert result['result']['msg'] == expected_message pull_request_id = result['result']['pull_request_id'] pull_request = PullRequestModel().get(pull_request_id) assert pull_request.title == data['title'] assert pull_request.description == data['description'] assert pull_request.source_ref == data['source_ref'] assert pull_request.target_ref == data['target_ref'] assert pull_request.source_repo.repo_name == data['source_repo'] assert pull_request.target_repo.repo_name == data['target_repo'] assert pull_request.revisions == [self.commit_ids['change']] assert len(pull_request.reviewers) == 1 @pytest.mark.backends("git", "hg") def test_create_with_empty_description(self, backend): data = self._prepare_data(backend) data.pop('description') id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = "Created new pull request `{title}`".format( title=data['title']) result = response.json assert result['error'] is None assert result['result']['msg'] == expected_message pull_request_id = result['result']['pull_request_id'] pull_request = PullRequestModel().get(pull_request_id) assert pull_request.description == '' @pytest.mark.backends("git", "hg") def test_create_with_empty_title(self, backend): data = self._prepare_data(backend) data.pop('title') id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) result = response.json pull_request_id = result['result']['pull_request_id'] pull_request = PullRequestModel().get(pull_request_id) data['ref'] = backend.default_branch_name title = '{source_repo}#{ref} to {target_repo}'.format(**data) assert pull_request.title == title @pytest.mark.backends("git", "hg") def test_create_with_reviewers_specified_by_names( self, backend, no_notifications): data = self._prepare_data(backend) reviewers = [ {'username': TEST_USER_REGULAR_LOGIN, 'reasons': ['{} added manually'.format(TEST_USER_REGULAR_LOGIN)]}, {'username': TEST_USER_ADMIN_LOGIN, 'reasons': ['{} added manually'.format(TEST_USER_ADMIN_LOGIN)], 'mandatory': True}, ] data['reviewers'] = reviewers id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = "Created new pull request `{title}`".format( title=data['title']) result = response.json assert result['error'] is None assert result['result']['msg'] == expected_message pull_request_id = result['result']['pull_request_id'] pull_request = PullRequestModel().get(pull_request_id) actual_reviewers = [] for rev in pull_request.reviewers: entry = { 'username': rev.user.username, 'reasons': rev.reasons, } if rev.mandatory: entry['mandatory'] = rev.mandatory actual_reviewers.append(entry) owner_username = pull_request.target_repo.user.username for spec_reviewer in reviewers[::]: # default reviewer will be added who is an owner of the repo # this get's overridden by a add owner to reviewers rule if spec_reviewer['username'] == owner_username: spec_reviewer['reasons'] = [u'Default reviewer', u'Repository owner'] # since owner is more important, we don't inherit mandatory flag del spec_reviewer['mandatory'] assert sorted(actual_reviewers, key=lambda e: e['username']) \ == sorted(reviewers, key=lambda e: e['username']) @pytest.mark.backends("git", "hg") def test_create_with_reviewers_specified_by_ids( self, backend, no_notifications): data = self._prepare_data(backend) reviewers = [ {'username': UserModel().get_by_username( TEST_USER_REGULAR_LOGIN).user_id, 'reasons': ['added manually']}, {'username': UserModel().get_by_username( TEST_USER_ADMIN_LOGIN).user_id, 'reasons': ['added manually']}, ] data['reviewers'] = reviewers id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = "Created new pull request `{title}`".format( title=data['title']) result = response.json assert result['error'] is None assert result['result']['msg'] == expected_message pull_request_id = result['result']['pull_request_id'] pull_request = PullRequestModel().get(pull_request_id) actual_reviewers = [] for rev in pull_request.reviewers: entry = { 'username': rev.user.user_id, 'reasons': rev.reasons, } if rev.mandatory: entry['mandatory'] = rev.mandatory actual_reviewers.append(entry) owner_user_id = pull_request.target_repo.user.user_id for spec_reviewer in reviewers[::]: # default reviewer will be added who is an owner of the repo # this get's overridden by a add owner to reviewers rule if spec_reviewer['username'] == owner_user_id: spec_reviewer['reasons'] = [u'Default reviewer', u'Repository owner'] assert sorted(actual_reviewers, key=lambda e: e['username']) \ == sorted(reviewers, key=lambda e: e['username']) @pytest.mark.backends("git", "hg") def test_create_fails_when_the_reviewer_is_not_found(self, backend): data = self._prepare_data(backend) data['reviewers'] = [{'username': 'somebody'}] id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = 'user `somebody` does not exist' assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") def test_cannot_create_with_reviewers_in_wrong_format(self, backend): data = self._prepare_data(backend) reviewers = ','.join([TEST_USER_REGULAR_LOGIN, TEST_USER_ADMIN_LOGIN]) data['reviewers'] = reviewers id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = {u'': '"test_regular,test_admin" is not iterable'} assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") def test_create_with_no_commit_hashes(self, backend): data = self._prepare_data(backend) expected_source_ref = data['source_ref'] expected_target_ref = data['target_ref'] data['source_ref'] = 'branch:{}'.format(backend.default_branch_name) data['target_ref'] = 'branch:{}'.format(backend.default_branch_name) id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = "Created new pull request `{title}`".format( title=data['title']) result = response.json assert result['result']['msg'] == expected_message pull_request_id = result['result']['pull_request_id'] pull_request = PullRequestModel().get(pull_request_id) assert pull_request.source_ref == expected_source_ref assert pull_request.target_ref == expected_target_ref @pytest.mark.backends("git", "hg") @pytest.mark.parametrize("data_key", ["source_repo", "target_repo"]) def test_create_fails_with_wrong_repo(self, backend, data_key): repo_name = 'fake-repo' data = self._prepare_data(backend) data[data_key] = repo_name id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = 'repository `{}` does not exist'.format(repo_name) assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") @pytest.mark.parametrize("data_key", ["source_ref", "target_ref"]) def test_create_fails_with_non_existing_branch(self, backend, data_key): branch_name = 'test-branch' data = self._prepare_data(backend) data[data_key] = "branch:{}".format(branch_name) id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = 'The specified value:{type}:`{name}` ' \ 'does not exist, or is not allowed.'.format(type='branch', name=branch_name) assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") @pytest.mark.parametrize("data_key", ["source_ref", "target_ref"]) def test_create_fails_with_ref_in_a_wrong_format(self, backend, data_key): data = self._prepare_data(backend) ref = 'stange-ref' data[data_key] = ref id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = ( 'Ref `{ref}` given in a wrong format. Please check the API' ' documentation for more details'.format(ref=ref)) assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") @pytest.mark.parametrize("data_key", ["source_ref", "target_ref"]) def test_create_fails_with_non_existing_ref(self, backend, data_key): commit_id = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa10' ref = self._get_full_ref(backend, commit_id) data = self._prepare_data(backend) data[data_key] = ref id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = 'Ref `{}` does not exist'.format(ref) assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") def test_create_fails_when_no_revisions(self, backend): data = self._prepare_data(backend, source_head='initial') id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = 'no commits found for merge between specified references' assert_error(id_, expected_message, given=response.body) @pytest.mark.backends("git", "hg") def test_create_fails_when_no_permissions(self, backend): data = self._prepare_data(backend) RepoModel().revoke_user_permission( self.source.repo_name, self.test_user) RepoModel().revoke_user_permission( self.source.repo_name, User.DEFAULT_USER) id_, params = build_data( self.apikey_regular, 'create_pull_request', **data) response = api_call(self.app, params) expected_message = 'repository `{}` does not exist'.format( self.source.repo_name) assert_error(id_, expected_message, given=response.body) def _prepare_data( self, backend, source_head='change', target_head='initial'): commits = [ {'message': 'initial'}, {'message': 'change'}, {'message': 'new-feature', 'parents': ['initial'], 'branch': 'feature'}, ] self.commit_ids = backend.create_master_repo(commits) self.source = backend.create_repo(heads=[source_head]) self.target = backend.create_repo(heads=[target_head]) data = { 'source_repo': self.source.repo_name, 'target_repo': self.target.repo_name, 'source_ref': self._get_full_ref( backend, self.commit_ids[source_head]), 'target_ref': self._get_full_ref( backend, self.commit_ids[target_head]), 'title': 'Test PR 1', 'description': 'Test' } RepoModel().grant_user_permission( self.source.repo_name, self.TEST_USER_LOGIN, 'repository.read') return data def _get_full_ref(self, backend, commit_id): return 'branch:{branch}:{commit_id}'.format( branch=backend.default_branch_name, commit_id=commit_id)