# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import mock import pytest import textwrap import rhodecode from rhodecode.lib.vcs.backends import get_backend from rhodecode.lib.vcs.backends.base import ( MergeResponse, MergeFailureReason, Reference) from rhodecode.lib.vcs.exceptions import RepositoryError from rhodecode.lib.vcs.nodes import FileNode from rhodecode.model.comment import CommentsModel from rhodecode.model.db import PullRequest, Session from rhodecode.model.pull_request import PullRequestModel from rhodecode.model.user import UserModel from rhodecode.tests import TEST_USER_ADMIN_LOGIN from rhodecode.lib.str_utils import safe_str pytestmark = [ pytest.mark.backends("git", "hg"), ] @pytest.mark.usefixtures('config_stub') class TestPullRequestModel(object): @pytest.fixture() def pull_request(self, request, backend, pr_util): """ A pull request combined with multiples patches. """ BackendClass = get_backend(backend.alias) merge_resp = MergeResponse( False, False, None, MergeFailureReason.UNKNOWN, metadata={'exception': 'MockError'}) self.merge_patcher = mock.patch.object( BackendClass, 'merge', return_value=merge_resp) self.workspace_remove_patcher = mock.patch.object( BackendClass, 'cleanup_merge_workspace') self.workspace_remove_mock = self.workspace_remove_patcher.start() self.merge_mock = self.merge_patcher.start() self.comment_patcher = mock.patch( 'rhodecode.model.changeset_status.ChangesetStatusModel.set_status') self.comment_patcher.start() self.notification_patcher = mock.patch( 'rhodecode.model.notification.NotificationModel.create') self.notification_patcher.start() self.helper_patcher = mock.patch( 'rhodecode.lib.helpers.route_path') self.helper_patcher.start() self.hook_patcher = mock.patch.object(PullRequestModel, 'trigger_pull_request_hook') self.hook_mock = self.hook_patcher.start() self.invalidation_patcher = mock.patch( 'rhodecode.model.pull_request.ScmModel.mark_for_invalidation') self.invalidation_mock = self.invalidation_patcher.start() self.pull_request = pr_util.create_pull_request( mergeable=True, name_suffix=u'ąć') self.source_commit = self.pull_request.source_ref_parts.commit_id self.target_commit = self.pull_request.target_ref_parts.commit_id self.workspace_id = 'pr-%s' % self.pull_request.pull_request_id self.repo_id = self.pull_request.target_repo.repo_id @request.addfinalizer def cleanup_pull_request(): calls = [mock.call( self.pull_request, self.pull_request.author, 'create')] self.hook_mock.assert_has_calls(calls) self.workspace_remove_patcher.stop() self.merge_patcher.stop() self.comment_patcher.stop() self.notification_patcher.stop() self.helper_patcher.stop() self.hook_patcher.stop() self.invalidation_patcher.stop() return self.pull_request def test_get_all(self, pull_request): prs = PullRequestModel().get_all(pull_request.target_repo) assert isinstance(prs, list) assert len(prs) == 1 def test_count_all(self, pull_request): pr_count = PullRequestModel().count_all(pull_request.target_repo) assert pr_count == 1 def test_get_awaiting_review(self, pull_request): prs = PullRequestModel().get_awaiting_review(pull_request.target_repo) assert isinstance(prs, list) assert len(prs) == 1 def test_count_awaiting_review(self, pull_request): pr_count = PullRequestModel().count_awaiting_review( pull_request.target_repo) assert pr_count == 1 def test_get_awaiting_my_review(self, pull_request): PullRequestModel().update_reviewers( pull_request, [(pull_request.author, ['author'], False, 'reviewer', [])], pull_request.author) Session().commit() prs = PullRequestModel().get_awaiting_my_review( pull_request.target_repo.repo_name, user_id=pull_request.author.user_id) assert isinstance(prs, list) assert len(prs) == 1 def test_count_awaiting_my_review(self, pull_request): PullRequestModel().update_reviewers( pull_request, [(pull_request.author, ['author'], False, 'reviewer', [])], pull_request.author) Session().commit() pr_count = PullRequestModel().count_awaiting_my_review( pull_request.target_repo.repo_name, user_id=pull_request.author.user_id) assert pr_count == 1 def test_delete_calls_cleanup_merge(self, pull_request): repo_id = pull_request.target_repo.repo_id PullRequestModel().delete(pull_request, pull_request.author) Session().commit() self.workspace_remove_mock.assert_called_once_with( repo_id, self.workspace_id) def test_close_calls_cleanup_and_hook(self, pull_request): PullRequestModel().close_pull_request( pull_request, pull_request.author) Session().commit() repo_id = pull_request.target_repo.repo_id self.workspace_remove_mock.assert_called_once_with( repo_id, self.workspace_id) self.hook_mock.assert_called_with( self.pull_request, self.pull_request.author, 'close') def test_merge_status(self, pull_request): self.merge_mock.return_value = MergeResponse( True, False, None, MergeFailureReason.NONE) assert pull_request._last_merge_source_rev is None assert pull_request._last_merge_target_rev is None assert pull_request.last_merge_status is None merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is True assert msg == 'This pull request can be automatically merged.' self.merge_mock.assert_called_with( self.repo_id, self.workspace_id, pull_request.target_ref_parts, pull_request.source_repo.scm_instance(), pull_request.source_ref_parts, dry_run=True, use_rebase=False, close_branch=False) assert pull_request._last_merge_source_rev == self.source_commit assert pull_request._last_merge_target_rev == self.target_commit assert pull_request.last_merge_status is MergeFailureReason.NONE self.merge_mock.reset_mock() merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is True assert msg == 'This pull request can be automatically merged.' assert self.merge_mock.called is False def test_merge_status_known_failure(self, pull_request): self.merge_mock.return_value = MergeResponse( False, False, None, MergeFailureReason.MERGE_FAILED, metadata={'unresolved_files': 'file1'}) assert pull_request._last_merge_source_rev is None assert pull_request._last_merge_target_rev is None assert pull_request.last_merge_status is None merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == 'This pull request cannot be merged because of merge conflicts. file1' self.merge_mock.assert_called_with( self.repo_id, self.workspace_id, pull_request.target_ref_parts, pull_request.source_repo.scm_instance(), pull_request.source_ref_parts, dry_run=True, use_rebase=False, close_branch=False) assert pull_request._last_merge_source_rev == self.source_commit assert pull_request._last_merge_target_rev == self.target_commit assert pull_request.last_merge_status is MergeFailureReason.MERGE_FAILED self.merge_mock.reset_mock() merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == 'This pull request cannot be merged because of merge conflicts. file1' assert self.merge_mock.called is False def test_merge_status_unknown_failure(self, pull_request): self.merge_mock.return_value = MergeResponse( False, False, None, MergeFailureReason.UNKNOWN, metadata={'exception': 'MockError'}) assert pull_request._last_merge_source_rev is None assert pull_request._last_merge_target_rev is None assert pull_request.last_merge_status is None merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == ( 'This pull request cannot be merged because of an unhandled exception. ' 'MockError') self.merge_mock.assert_called_with( self.repo_id, self.workspace_id, pull_request.target_ref_parts, pull_request.source_repo.scm_instance(), pull_request.source_ref_parts, dry_run=True, use_rebase=False, close_branch=False) assert pull_request._last_merge_source_rev is None assert pull_request._last_merge_target_rev is None assert pull_request.last_merge_status is None self.merge_mock.reset_mock() merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == ( 'This pull request cannot be merged because of an unhandled exception. ' 'MockError') assert self.merge_mock.called is True def test_merge_status_when_target_is_locked(self, pull_request): pull_request.target_repo.locked = [1, u'12345.50', 'lock_web'] merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == ( 'This pull request cannot be merged because the target repository ' 'is locked by user:1.') def test_merge_status_requirements_check_target(self, pull_request): def has_largefiles(self, repo): return repo == pull_request.source_repo patcher = mock.patch.object(PullRequestModel, '_has_largefiles', has_largefiles) with patcher: merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == 'Target repository large files support is disabled.' def test_merge_status_requirements_check_source(self, pull_request): def has_largefiles(self, repo): return repo == pull_request.target_repo patcher = mock.patch.object(PullRequestModel, '_has_largefiles', has_largefiles) with patcher: merge_response, status, msg = PullRequestModel().merge_status(pull_request) assert status is False assert msg == 'Source repository large files support is disabled.' def test_merge(self, pull_request, merge_extras): user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) merge_ref = Reference( 'type', 'name', '6126b7bfcc82ad2d3deaee22af926b082ce54cc6') self.merge_mock.return_value = MergeResponse( True, True, merge_ref, MergeFailureReason.NONE) merge_extras['repository'] = pull_request.target_repo.repo_name PullRequestModel().merge_repo( pull_request, pull_request.author, extras=merge_extras) Session().commit() message = ( u'Merge pull request !{pr_id} from {source_repo} {source_ref_name}' u'\n\n {pr_title}'.format( pr_id=pull_request.pull_request_id, source_repo=safe_str( pull_request.source_repo.scm_instance().name), source_ref_name=pull_request.source_ref_parts.name, pr_title=safe_str(pull_request.title) ) ) self.merge_mock.assert_called_with( self.repo_id, self.workspace_id, pull_request.target_ref_parts, pull_request.source_repo.scm_instance(), pull_request.source_ref_parts, user_name=user.short_contact, user_email=user.email, message=message, use_rebase=False, close_branch=False ) self.invalidation_mock.assert_called_once_with( pull_request.target_repo.repo_name) self.hook_mock.assert_called_with( self.pull_request, self.pull_request.author, 'merge') pull_request = PullRequest.get(pull_request.pull_request_id) assert pull_request.merge_rev == '6126b7bfcc82ad2d3deaee22af926b082ce54cc6' def test_merge_with_status_lock(self, pull_request, merge_extras): user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) merge_ref = Reference( 'type', 'name', '6126b7bfcc82ad2d3deaee22af926b082ce54cc6') self.merge_mock.return_value = MergeResponse( True, True, merge_ref, MergeFailureReason.NONE) merge_extras['repository'] = pull_request.target_repo.repo_name with pull_request.set_state(PullRequest.STATE_UPDATING): assert pull_request.pull_request_state == PullRequest.STATE_UPDATING PullRequestModel().merge_repo( pull_request, pull_request.author, extras=merge_extras) Session().commit() assert pull_request.pull_request_state == PullRequest.STATE_CREATED message = ( u'Merge pull request !{pr_id} from {source_repo} {source_ref_name}' u'\n\n {pr_title}'.format( pr_id=pull_request.pull_request_id, source_repo=safe_str( pull_request.source_repo.scm_instance().name), source_ref_name=pull_request.source_ref_parts.name, pr_title=safe_str(pull_request.title) ) ) self.merge_mock.assert_called_with( self.repo_id, self.workspace_id, pull_request.target_ref_parts, pull_request.source_repo.scm_instance(), pull_request.source_ref_parts, user_name=user.short_contact, user_email=user.email, message=message, use_rebase=False, close_branch=False ) self.invalidation_mock.assert_called_once_with( pull_request.target_repo.repo_name) self.hook_mock.assert_called_with( self.pull_request, self.pull_request.author, 'merge') pull_request = PullRequest.get(pull_request.pull_request_id) assert pull_request.merge_rev == '6126b7bfcc82ad2d3deaee22af926b082ce54cc6' def test_merge_failed(self, pull_request, merge_extras): user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN) merge_ref = Reference( 'type', 'name', '6126b7bfcc82ad2d3deaee22af926b082ce54cc6') self.merge_mock.return_value = MergeResponse( False, False, merge_ref, MergeFailureReason.MERGE_FAILED) merge_extras['repository'] = pull_request.target_repo.repo_name PullRequestModel().merge_repo( pull_request, pull_request.author, extras=merge_extras) Session().commit() message = ( u'Merge pull request !{pr_id} from {source_repo} {source_ref_name}' u'\n\n {pr_title}'.format( pr_id=pull_request.pull_request_id, source_repo=safe_str( pull_request.source_repo.scm_instance().name), source_ref_name=pull_request.source_ref_parts.name, pr_title=safe_str(pull_request.title) ) ) self.merge_mock.assert_called_with( self.repo_id, self.workspace_id, pull_request.target_ref_parts, pull_request.source_repo.scm_instance(), pull_request.source_ref_parts, user_name=user.short_contact, user_email=user.email, message=message, use_rebase=False, close_branch=False ) pull_request = PullRequest.get(pull_request.pull_request_id) assert self.invalidation_mock.called is False assert pull_request.merge_rev is None def test_get_commit_ids(self, pull_request): # The PR has been not merged yet, so expect an exception with pytest.raises(ValueError): PullRequestModel()._get_commit_ids(pull_request) # Merge revision is in the revisions list pull_request.merge_rev = pull_request.revisions[0] commit_ids = PullRequestModel()._get_commit_ids(pull_request) assert commit_ids == pull_request.revisions # Merge revision is not in the revisions list pull_request.merge_rev = 'f000' * 10 commit_ids = PullRequestModel()._get_commit_ids(pull_request) assert commit_ids == pull_request.revisions + [pull_request.merge_rev] def test_get_diff_from_pr_version(self, pull_request): source_repo = pull_request.source_repo source_ref_id = pull_request.source_ref_parts.commit_id target_ref_id = pull_request.target_ref_parts.commit_id diff = PullRequestModel()._get_diff_from_pr_or_version( source_repo, source_ref_id, target_ref_id, hide_whitespace_changes=False, diff_context=6) assert b'file_1' in diff.raw.tobytes() def test_generate_title_returns_unicode(self): title = PullRequestModel().generate_pullrequest_title( source='source-dummy', source_ref='source-ref-dummy', target='target-dummy', ) assert type(title) == str @pytest.mark.parametrize('title, has_wip', [ ('hello', False), ('hello wip', False), ('hello wip: xxx', False), ('[wip] hello', True), ('[wip] hello', True), ('wip: hello', True), ('wip hello', True), ]) def test_wip_title_marker(self, pull_request, title, has_wip): pull_request.title = title assert pull_request.work_in_progress == has_wip @pytest.mark.usefixtures('config_stub') class TestIntegrationMerge(object): @pytest.mark.parametrize('extra_config', ( {'vcs.hooks.protocol.v2': 'celery', 'vcs.hooks.direct_calls': False}, )) def test_merge_triggers_push_hooks( self, pr_util, user_admin, capture_rcextensions, merge_extras, extra_config): pull_request = pr_util.create_pull_request( approved=True, mergeable=True) # TODO: johbo: Needed for sqlite, try to find an automatic way for it merge_extras['repository'] = pull_request.target_repo.repo_name Session().commit() with mock.patch.dict(rhodecode.CONFIG, extra_config, clear=False): merge_state = PullRequestModel().merge_repo( pull_request, user_admin, extras=merge_extras) Session().commit() assert merge_state.executed assert '_pre_push_hook' in capture_rcextensions assert '_push_hook' in capture_rcextensions def test_merge_can_be_rejected_by_pre_push_hook( self, pr_util, user_admin, capture_rcextensions, merge_extras): pull_request = pr_util.create_pull_request( approved=True, mergeable=True) # TODO: johbo: Needed for sqlite, try to find an automatic way for it merge_extras['repository'] = pull_request.target_repo.repo_name Session().commit() with mock.patch('rhodecode.EXTENSIONS.PRE_PUSH_HOOK') as pre_pull: pre_pull.side_effect = RepositoryError("Disallow push!") merge_status = PullRequestModel().merge_repo( pull_request, user_admin, extras=merge_extras) Session().commit() assert not merge_status.executed assert 'pre_push' not in capture_rcextensions assert 'post_push' not in capture_rcextensions def test_merge_fails_if_target_is_locked( self, pr_util, user_regular, merge_extras): pull_request = pr_util.create_pull_request( approved=True, mergeable=True) locked_by = [user_regular.user_id + 1, 12345.50, 'lock_web'] pull_request.target_repo.locked = locked_by # TODO: johbo: Check if this can work based on the database, currently # all data is pre-computed, that's why just updating the DB is not # enough. merge_extras['locked_by'] = locked_by merge_extras['repository'] = pull_request.target_repo.repo_name # TODO: johbo: Needed for sqlite, try to find an automatic way for it Session().commit() merge_status = PullRequestModel().merge_repo( pull_request, user_regular, extras=merge_extras) Session().commit() assert not merge_status.executed @pytest.mark.parametrize('use_outdated, inlines_count, outdated_count', [ (False, 1, 0), (True, 0, 1), ]) def test_outdated_comments( pr_util, use_outdated, inlines_count, outdated_count, config_stub): pull_request = pr_util.create_pull_request() pr_util.create_inline_comment(file_path='not_in_updated_diff') with outdated_comments_patcher(use_outdated) as outdated_comment_mock: pr_util.add_one_commit() assert_inline_comments( pull_request, visible=inlines_count, outdated=outdated_count) outdated_comment_mock.assert_called_with(pull_request) @pytest.mark.parametrize('mr_type, expected_msg', [ (MergeFailureReason.NONE, 'This pull request can be automatically merged.'), (MergeFailureReason.UNKNOWN, 'This pull request cannot be merged because of an unhandled exception. CRASH'), (MergeFailureReason.MERGE_FAILED, 'This pull request cannot be merged because of merge conflicts. CONFLICT_FILE'), (MergeFailureReason.PUSH_FAILED, 'This pull request could not be merged because push to target:`some-repo@merge_commit` failed.'), (MergeFailureReason.TARGET_IS_NOT_HEAD, 'This pull request cannot be merged because the target `ref_name` is not a head.'), (MergeFailureReason.HG_SOURCE_HAS_MORE_BRANCHES, 'This pull request cannot be merged because the source contains more branches than the target.'), (MergeFailureReason.HG_TARGET_HAS_MULTIPLE_HEADS, 'This pull request cannot be merged because the target `ref_name` has multiple heads: `a,b,c`.'), (MergeFailureReason.TARGET_IS_LOCKED, 'This pull request cannot be merged because the target repository is locked by user:123.'), (MergeFailureReason.MISSING_TARGET_REF, 'This pull request cannot be merged because the target reference `ref_name` is missing.'), (MergeFailureReason.MISSING_SOURCE_REF, 'This pull request cannot be merged because the source reference `ref_name` is missing.'), (MergeFailureReason.SUBREPO_MERGE_FAILED, 'This pull request cannot be merged because of conflicts related to sub repositories.'), ]) def test_merge_response_message(mr_type, expected_msg): merge_ref = Reference('type', 'ref_name', '6126b7bfcc82ad2d3deaee22af926b082ce54cc6') metadata = { 'unresolved_files': 'CONFLICT_FILE', 'exception': "CRASH", 'target': 'some-repo', 'merge_commit': 'merge_commit', 'target_ref': merge_ref, 'source_ref': merge_ref, 'heads': ','.join(['a', 'b', 'c']), 'locked_by': 'user:123' } merge_response = MergeResponse(True, True, merge_ref, mr_type, metadata=metadata) assert merge_response.merge_status_message == expected_msg @pytest.fixture() def merge_extras(request, user_regular): """ Context for the vcs operation when running a merge. """ extras = { 'ip': '127.0.0.1', 'username': user_regular.username, 'user_id': user_regular.user_id, 'action': 'push', 'repository': 'fake_target_repo_name', 'scm': 'git', 'config': request.config.getini('pyramid_config'), 'repo_store': '', 'make_lock': None, 'locked_by': [None, None, None], 'server_url': 'http://test.example.com:5000', 'hooks': ['push', 'pull'], 'is_shadow_repo': False, } return extras @pytest.mark.usefixtures('config_stub') class TestUpdateCommentHandling(object): @pytest.fixture(autouse=True, scope='class') def enable_outdated_comments(self, request, baseapp): config_patch = mock.patch.dict( 'rhodecode.CONFIG', {'rhodecode_use_outdated_comments': True}) config_patch.start() @request.addfinalizer def cleanup(): config_patch.stop() def test_comment_stays_unflagged_on_unchanged_diff(self, pr_util): commits = [ {'message': 'a'}, {'message': 'b', 'added': [FileNode(b'file_b', b'test_content\n')]}, {'message': 'c', 'added': [FileNode(b'file_c', b'test_content\n')]}, ] pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b']) pr_util.create_inline_comment(file_path='file_b') pr_util.add_one_commit(head='c') assert_inline_comments(pull_request, visible=1, outdated=0) def test_comment_stays_unflagged_on_change_above(self, pr_util): original_content = b''.join((b'line %d\n' % x for x in range(1, 11))) updated_content = b'new_line_at_top\n' + original_content commits = [ {'message': 'a'}, {'message': 'b', 'added': [FileNode(b'file_b', original_content)]}, {'message': 'c', 'changed': [FileNode(b'file_b', updated_content)]}, ] pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b']) with outdated_comments_patcher(): comment = pr_util.create_inline_comment( line_no=u'n8', file_path='file_b') pr_util.add_one_commit(head='c') assert_inline_comments(pull_request, visible=1, outdated=0) assert comment.line_no == u'n9' def test_comment_stays_unflagged_on_change_below(self, pr_util): original_content = b''.join([b'line %d\n' % x for x in range(10)]) updated_content = original_content + b'new_line_at_end\n' commits = [ {'message': 'a'}, {'message': 'b', 'added': [FileNode(b'file_b', original_content)]}, {'message': 'c', 'changed': [FileNode(b'file_b', updated_content)]}, ] pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b']) pr_util.create_inline_comment(file_path='file_b') pr_util.add_one_commit(head='c') assert_inline_comments(pull_request, visible=1, outdated=0) @pytest.mark.parametrize('line_no', ['n4', 'o4', 'n10', 'o9']) def test_comment_flagged_on_change_around_context(self, pr_util, line_no): base_lines = [b'line %d\n' % x for x in range(1, 13)] change_lines = list(base_lines) change_lines.insert(6, b'line 6a added\n') # Changes on the last line of sight update_lines = list(change_lines) update_lines[0] = b'line 1 changed\n' update_lines[-1] = b'line 12 changed\n' def file_b(lines): return FileNode(b'file_b', b''.join(lines)) commits = [ {'message': 'a', 'added': [file_b(base_lines)]}, {'message': 'b', 'changed': [file_b(change_lines)]}, {'message': 'c', 'changed': [file_b(update_lines)]}, ] pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b']) pr_util.create_inline_comment(line_no=line_no, file_path='file_b') with outdated_comments_patcher(): pr_util.add_one_commit(head='c') assert_inline_comments(pull_request, visible=0, outdated=1) @pytest.mark.parametrize("change, content", [ ('changed', b'changed\n'), ('removed', b''), ], ids=['changed', b'removed']) def test_comment_flagged_on_change(self, pr_util, change, content): commits = [ {'message': 'a'}, {'message': 'b', 'added': [FileNode(b'file_b', b'test_content\n')]}, {'message': 'c', change: [FileNode(b'file_b', content)]}, ] pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b']) pr_util.create_inline_comment(file_path='file_b') with outdated_comments_patcher(): pr_util.add_one_commit(head='c') assert_inline_comments(pull_request, visible=0, outdated=1) @pytest.mark.usefixtures('config_stub') class TestUpdateChangedFiles(object): def test_no_changes_on_unchanged_diff(self, pr_util): commits = [ {'message': 'a'}, {'message': 'b', 'added': [FileNode(b'file_b', b'test_content b\n')]}, {'message': 'c', 'added': [FileNode(b'file_c', b'test_content c\n')]}, ] # open a PR from a to b, adding file_b pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b'], name_suffix='per-file-review') # modify PR adding new file file_c pr_util.add_one_commit(head='c') assert_pr_file_changes( pull_request, added=['file_c'], modified=[], removed=[]) def test_modify_and_undo_modification_diff(self, pr_util): commits = [ {'message': 'a'}, {'message': 'b', 'added': [FileNode(b'file_b', b'test_content b\n')]}, {'message': 'c', 'changed': [FileNode(b'file_b', b'test_content b modified\n')]}, {'message': 'd', 'changed': [FileNode(b'file_b', b'test_content b\n')]}, ] # open a PR from a to b, adding file_b pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b'], name_suffix='per-file-review') # modify PR modifying file file_b pr_util.add_one_commit(head='c') assert_pr_file_changes( pull_request, added=[], modified=['file_b'], removed=[]) # move the head again to d, which rollbacks change, # meaning we should indicate no changes pr_util.add_one_commit(head='d') assert_pr_file_changes( pull_request, added=[], modified=[], removed=[]) def test_updated_all_files_in_pr(self, pr_util): commits = [ {'message': 'a'}, {'message': 'b', 'added': [ FileNode(b'file_a', b'test_content a\n'), FileNode(b'file_b', b'test_content b\n'), FileNode(b'file_c', b'test_content c\n')]}, {'message': 'c', 'changed': [ FileNode(b'file_a', b'test_content a changed\n'), FileNode(b'file_b', b'test_content b changed\n'), FileNode(b'file_c', b'test_content c changed\n')]}, ] # open a PR from a to b, changing 3 files pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b'], name_suffix='per-file-review') pr_util.add_one_commit(head='c') assert_pr_file_changes( pull_request, added=[], modified=['file_a', 'file_b', 'file_c'], removed=[]) def test_updated_and_removed_all_files_in_pr(self, pr_util): commits = [ {'message': 'a'}, {'message': 'b', 'added': [ FileNode(b'file_a', b'test_content a\n'), FileNode(b'file_b', b'test_content b\n'), FileNode(b'file_c', b'test_content c\n')]}, {'message': 'c', 'removed': [ FileNode(b'file_a', b'test_content a changed\n'), FileNode(b'file_b', b'test_content b changed\n'), FileNode(b'file_c', b'test_content c changed\n')]}, ] # open a PR from a to b, removing 3 files pull_request = pr_util.create_pull_request( commits=commits, target_head='a', source_head='b', revisions=['b'], name_suffix='per-file-review') pr_util.add_one_commit(head='c') assert_pr_file_changes( pull_request, added=[], modified=[], removed=['file_a', 'file_b', 'file_c']) def test_update_writes_snapshot_into_pull_request_version(pr_util, config_stub): model = PullRequestModel() pull_request = pr_util.create_pull_request() pr_util.update_source_repository() model.update_commits(pull_request, pull_request.author) # Expect that it has a version entry now assert len(model.get_versions(pull_request)) == 1 def test_update_skips_new_version_if_unchanged(pr_util, config_stub): pull_request = pr_util.create_pull_request() model = PullRequestModel() model.update_commits(pull_request, pull_request.author) # Expect that it still has no versions assert len(model.get_versions(pull_request)) == 0 def test_update_assigns_comments_to_the_new_version(pr_util, config_stub): model = PullRequestModel() pull_request = pr_util.create_pull_request() comment = pr_util.create_comment() pr_util.update_source_repository() model.update_commits(pull_request, pull_request.author) # Expect that the comment is linked to the pr version now assert comment.pull_request_version == model.get_versions(pull_request)[0] def test_update_adds_a_comment_to_the_pull_request_about_the_change(pr_util, config_stub): model = PullRequestModel() pull_request = pr_util.create_pull_request() pr_util.update_source_repository() pr_util.update_source_repository() update_response = model.update_commits(pull_request, pull_request.author) commit_id = update_response.common_ancestor_id # Expect to find a new comment about the change expected_message = textwrap.dedent( """\ Pull request updated. Auto status change to |under_review| .. role:: added .. role:: removed .. parsed-literal:: Changed commits: * :added:`1 added` * :removed:`0 removed` Changed files: * `A file_2 <#a_c-{}-92ed3b5f07b4>`_ .. |under_review| replace:: *"Under Review"*""" ).format(commit_id[:12]) pull_request_comments = sorted( pull_request.comments, key=lambda c: c.modified_at) update_comment = pull_request_comments[-1] assert update_comment.text == expected_message def test_create_version_from_snapshot_updates_attributes(pr_util, config_stub): pull_request = pr_util.create_pull_request() # Avoiding default values pull_request.status = PullRequest.STATUS_CLOSED pull_request._last_merge_source_rev = "0" * 40 pull_request._last_merge_target_rev = "1" * 40 pull_request.last_merge_status = 1 pull_request.merge_rev = "2" * 40 # Remember automatic values created_on = pull_request.created_on updated_on = pull_request.updated_on # Create a new version of the pull request version = PullRequestModel()._create_version_from_snapshot(pull_request) # Check attributes assert version.title == pr_util.create_parameters['title'] assert version.description == pr_util.create_parameters['description'] assert version.status == PullRequest.STATUS_CLOSED # versions get updated created_on assert version.created_on != created_on assert version.updated_on == updated_on assert version.user_id == pull_request.user_id assert version.revisions == pr_util.create_parameters['revisions'] assert version.source_repo == pr_util.source_repository assert version.source_ref == pr_util.create_parameters['source_ref'] assert version.target_repo == pr_util.target_repository assert version.target_ref == pr_util.create_parameters['target_ref'] assert version._last_merge_source_rev == pull_request._last_merge_source_rev assert version._last_merge_target_rev == pull_request._last_merge_target_rev assert version.last_merge_status == pull_request.last_merge_status assert version.merge_rev == pull_request.merge_rev assert version.pull_request == pull_request def test_link_comments_to_version_only_updates_unlinked_comments(pr_util, config_stub): version1 = pr_util.create_version_of_pull_request() comment_linked = pr_util.create_comment(linked_to=version1) comment_unlinked = pr_util.create_comment() version2 = pr_util.create_version_of_pull_request() PullRequestModel()._link_comments_to_version(version2) Session().commit() # Expect that only the new comment is linked to version2 assert ( comment_unlinked.pull_request_version_id == version2.pull_request_version_id) assert ( comment_linked.pull_request_version_id == version1.pull_request_version_id) assert ( comment_unlinked.pull_request_version_id != comment_linked.pull_request_version_id) def test_calculate_commits(): old_ids = [1, 2, 3] new_ids = [1, 3, 4, 5] change = PullRequestModel()._calculate_commit_id_changes(old_ids, new_ids) assert change.added == [4, 5] assert change.common == [1, 3] assert change.removed == [2] assert change.total == [1, 3, 4, 5] def assert_inline_comments(pull_request, visible=None, outdated=None): if visible is not None: inline_comments = CommentsModel().get_inline_comments( pull_request.target_repo.repo_id, pull_request=pull_request) inline_cnt = len(CommentsModel().get_inline_comments_as_list( inline_comments)) assert inline_cnt == visible if outdated is not None: outdated_comments = CommentsModel().get_outdated_comments( pull_request.target_repo.repo_id, pull_request) assert len(outdated_comments) == outdated def assert_pr_file_changes( pull_request, added=None, modified=None, removed=None): pr_versions = PullRequestModel().get_versions(pull_request) # always use first version, ie original PR to calculate changes pull_request_version = pr_versions[0] old_diff_data, new_diff_data = PullRequestModel()._generate_update_diffs( pull_request, pull_request_version) file_changes = PullRequestModel()._calculate_file_changes( old_diff_data, new_diff_data) assert added == file_changes.added, \ 'expected added:%s vs value:%s' % (added, file_changes.added) assert modified == file_changes.modified, \ 'expected modified:%s vs value:%s' % (modified, file_changes.modified) assert removed == file_changes.removed, \ 'expected removed:%s vs value:%s' % (removed, file_changes.removed) def outdated_comments_patcher(use_outdated=True): return mock.patch.object( CommentsModel, 'use_outdated_comments', return_value=use_outdated)