##// END OF EJS Templates
http-proto: in case incoming requests come in as chunked stream the data to VCSServer....
http-proto: in case incoming requests come in as chunked stream the data to VCSServer. This should solve a problem of uploading large files to rhodecode. In case of git with small postBuffers GIT client streams data to the server. In such case we want to stream the data back again to vcsserver without reading it fully inside RhodeCode.

File last commit:

r1409:c1ce56be default
r1423:8b2e03e1 default
Show More
test_pullrequest.py
851 lines | 33.2 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2017 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import mock
import pytest
import textwrap
import rhodecode
from rhodecode.lib.utils2 import safe_unicode
from rhodecode.lib.vcs.backends import get_backend
from rhodecode.lib.vcs.backends.base import (
MergeResponse, MergeFailureReason, Reference)
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.model.comment import CommentsModel
from rhodecode.model.db import PullRequest, Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.tests import TEST_USER_ADMIN_LOGIN
pytestmark = [
pytest.mark.backends("git", "hg"),
]
class TestPullRequestModel:
@pytest.fixture
def pull_request(self, request, backend, pr_util):
"""
A pull request combined with multiples patches.
"""
BackendClass = get_backend(backend.alias)
self.merge_patcher = mock.patch.object(BackendClass, 'merge')
self.workspace_remove_patcher = mock.patch.object(
BackendClass, 'cleanup_merge_workspace')
self.workspace_remove_mock = self.workspace_remove_patcher.start()
self.merge_mock = self.merge_patcher.start()
self.comment_patcher = mock.patch(
'rhodecode.model.changeset_status.ChangesetStatusModel.set_status')
self.comment_patcher.start()
self.notification_patcher = mock.patch(
'rhodecode.model.notification.NotificationModel.create')
self.notification_patcher.start()
self.helper_patcher = mock.patch(
'rhodecode.lib.helpers.url')
self.helper_patcher.start()
self.hook_patcher = mock.patch.object(PullRequestModel,
'_trigger_pull_request_hook')
self.hook_mock = self.hook_patcher.start()
self.invalidation_patcher = mock.patch(
'rhodecode.model.pull_request.ScmModel.mark_for_invalidation')
self.invalidation_mock = self.invalidation_patcher.start()
self.pull_request = pr_util.create_pull_request(
mergeable=True, name_suffix=u'ąć')
self.source_commit = self.pull_request.source_ref_parts.commit_id
self.target_commit = self.pull_request.target_ref_parts.commit_id
self.workspace_id = 'pr-%s' % self.pull_request.pull_request_id
@request.addfinalizer
def cleanup_pull_request():
calls = [mock.call(
self.pull_request, self.pull_request.author, 'create')]
self.hook_mock.assert_has_calls(calls)
self.workspace_remove_patcher.stop()
self.merge_patcher.stop()
self.comment_patcher.stop()
self.notification_patcher.stop()
self.helper_patcher.stop()
self.hook_patcher.stop()
self.invalidation_patcher.stop()
return self.pull_request
def test_get_all(self, pull_request):
prs = PullRequestModel().get_all(pull_request.target_repo)
assert isinstance(prs, list)
assert len(prs) == 1
def test_count_all(self, pull_request):
pr_count = PullRequestModel().count_all(pull_request.target_repo)
assert pr_count == 1
def test_get_awaiting_review(self, pull_request):
prs = PullRequestModel().get_awaiting_review(pull_request.target_repo)
assert isinstance(prs, list)
assert len(prs) == 1
def test_count_awaiting_review(self, pull_request):
pr_count = PullRequestModel().count_awaiting_review(
pull_request.target_repo)
assert pr_count == 1
def test_get_awaiting_my_review(self, pull_request):
PullRequestModel().update_reviewers(
pull_request, [(pull_request.author, ['author'])])
prs = PullRequestModel().get_awaiting_my_review(
pull_request.target_repo, user_id=pull_request.author.user_id)
assert isinstance(prs, list)
assert len(prs) == 1
def test_count_awaiting_my_review(self, pull_request):
PullRequestModel().update_reviewers(
pull_request, [(pull_request.author, ['author'])])
pr_count = PullRequestModel().count_awaiting_my_review(
pull_request.target_repo, user_id=pull_request.author.user_id)
assert pr_count == 1
def test_delete_calls_cleanup_merge(self, pull_request):
PullRequestModel().delete(pull_request)
self.workspace_remove_mock.assert_called_once_with(
self.workspace_id)
def test_close_calls_cleanup_and_hook(self, pull_request):
PullRequestModel().close_pull_request(
pull_request, pull_request.author)
self.workspace_remove_mock.assert_called_once_with(
self.workspace_id)
self.hook_mock.assert_called_with(
self.pull_request, self.pull_request.author, 'close')
def test_merge_status(self, pull_request):
self.merge_mock.return_value = MergeResponse(
True, False, None, MergeFailureReason.NONE)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request._last_merge_status is None
status, msg = PullRequestModel().merge_status(pull_request)
assert status is True
assert msg.eval() == 'This pull request can be automatically merged.'
self.merge_mock.assert_called_once_with(
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts, self.workspace_id, dry_run=True,
use_rebase=False)
assert pull_request._last_merge_source_rev == self.source_commit
assert pull_request._last_merge_target_rev == self.target_commit
assert pull_request._last_merge_status is MergeFailureReason.NONE
self.merge_mock.reset_mock()
status, msg = PullRequestModel().merge_status(pull_request)
assert status is True
assert msg.eval() == 'This pull request can be automatically merged.'
assert self.merge_mock.called is False
def test_merge_status_known_failure(self, pull_request):
self.merge_mock.return_value = MergeResponse(
False, False, None, MergeFailureReason.MERGE_FAILED)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request._last_merge_status is None
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert (
msg.eval() ==
'This pull request cannot be merged because of merge conflicts.')
self.merge_mock.assert_called_once_with(
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts, self.workspace_id, dry_run=True,
use_rebase=False)
assert pull_request._last_merge_source_rev == self.source_commit
assert pull_request._last_merge_target_rev == self.target_commit
assert (
pull_request._last_merge_status is MergeFailureReason.MERGE_FAILED)
self.merge_mock.reset_mock()
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert (
msg.eval() ==
'This pull request cannot be merged because of merge conflicts.')
assert self.merge_mock.called is False
def test_merge_status_unknown_failure(self, pull_request):
self.merge_mock.return_value = MergeResponse(
False, False, None, MergeFailureReason.UNKNOWN)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request._last_merge_status is None
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg.eval() == (
'This pull request cannot be merged because of an unhandled'
' exception.')
self.merge_mock.assert_called_once_with(
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts, self.workspace_id, dry_run=True,
use_rebase=False)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request._last_merge_status is None
self.merge_mock.reset_mock()
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg.eval() == (
'This pull request cannot be merged because of an unhandled'
' exception.')
assert self.merge_mock.called is True
def test_merge_status_when_target_is_locked(self, pull_request):
pull_request.target_repo.locked = [1, u'12345.50', 'lock_web']
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg.eval() == (
'This pull request cannot be merged because the target repository'
' is locked.')
def test_merge_status_requirements_check_target(self, pull_request):
def has_largefiles(self, repo):
return repo == pull_request.source_repo
patcher = mock.patch.object(
PullRequestModel, '_has_largefiles', has_largefiles)
with patcher:
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == 'Target repository large files support is disabled.'
def test_merge_status_requirements_check_source(self, pull_request):
def has_largefiles(self, repo):
return repo == pull_request.target_repo
patcher = mock.patch.object(
PullRequestModel, '_has_largefiles', has_largefiles)
with patcher:
status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == 'Source repository large files support is disabled.'
def test_merge(self, pull_request, merge_extras):
user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
merge_ref = Reference(
'type', 'name', '6126b7bfcc82ad2d3deaee22af926b082ce54cc6')
self.merge_mock.return_value = MergeResponse(
True, True, merge_ref, MergeFailureReason.NONE)
merge_extras['repository'] = pull_request.target_repo.repo_name
PullRequestModel().merge(
pull_request, pull_request.author, extras=merge_extras)
message = (
u'Merge pull request #{pr_id} from {source_repo} {source_ref_name}'
u'\n\n {pr_title}'.format(
pr_id=pull_request.pull_request_id,
source_repo=safe_unicode(
pull_request.source_repo.scm_instance().name),
source_ref_name=pull_request.source_ref_parts.name,
pr_title=safe_unicode(pull_request.title)
)
)
self.merge_mock.assert_called_once_with(
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts, self.workspace_id,
user_name=user.username, user_email=user.email, message=message,
use_rebase=False
)
self.invalidation_mock.assert_called_once_with(
pull_request.target_repo.repo_name)
self.hook_mock.assert_called_with(
self.pull_request, self.pull_request.author, 'merge')
pull_request = PullRequest.get(pull_request.pull_request_id)
assert (
pull_request.merge_rev ==
'6126b7bfcc82ad2d3deaee22af926b082ce54cc6')
def test_merge_failed(self, pull_request, merge_extras):
user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
merge_ref = Reference(
'type', 'name', '6126b7bfcc82ad2d3deaee22af926b082ce54cc6')
self.merge_mock.return_value = MergeResponse(
False, False, merge_ref, MergeFailureReason.MERGE_FAILED)
merge_extras['repository'] = pull_request.target_repo.repo_name
PullRequestModel().merge(
pull_request, pull_request.author, extras=merge_extras)
message = (
u'Merge pull request #{pr_id} from {source_repo} {source_ref_name}'
u'\n\n {pr_title}'.format(
pr_id=pull_request.pull_request_id,
source_repo=safe_unicode(
pull_request.source_repo.scm_instance().name),
source_ref_name=pull_request.source_ref_parts.name,
pr_title=safe_unicode(pull_request.title)
)
)
self.merge_mock.assert_called_once_with(
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts, self.workspace_id,
user_name=user.username, user_email=user.email, message=message,
use_rebase=False
)
pull_request = PullRequest.get(pull_request.pull_request_id)
assert self.invalidation_mock.called is False
assert pull_request.merge_rev is None
def test_get_commit_ids(self, pull_request):
# The PR has been not merget yet, so expect an exception
with pytest.raises(ValueError):
PullRequestModel()._get_commit_ids(pull_request)
# Merge revision is in the revisions list
pull_request.merge_rev = pull_request.revisions[0]
commit_ids = PullRequestModel()._get_commit_ids(pull_request)
assert commit_ids == pull_request.revisions
# Merge revision is not in the revisions list
pull_request.merge_rev = 'f000' * 10
commit_ids = PullRequestModel()._get_commit_ids(pull_request)
assert commit_ids == pull_request.revisions + [pull_request.merge_rev]
def test_get_diff_from_pr_version(self, pull_request):
source_repo = pull_request.source_repo
source_ref_id = pull_request.source_ref_parts.commit_id
target_ref_id = pull_request.target_ref_parts.commit_id
diff = PullRequestModel()._get_diff_from_pr_or_version(
source_repo, source_ref_id, target_ref_id, context=6)
assert 'file_1' in diff.raw
def test_generate_title_returns_unicode(self):
title = PullRequestModel().generate_pullrequest_title(
source='source-dummy',
source_ref='source-ref-dummy',
target='target-dummy',
)
assert type(title) == unicode
class TestIntegrationMerge(object):
@pytest.mark.parametrize('extra_config', (
{'vcs.hooks.protocol': 'http', 'vcs.hooks.direct_calls': False},
))
def test_merge_triggers_push_hooks(
self, pr_util, user_admin, capture_rcextensions, merge_extras,
extra_config):
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
# TODO: johbo: Needed for sqlite, try to find an automatic way for it
merge_extras['repository'] = pull_request.target_repo.repo_name
Session().commit()
with mock.patch.dict(rhodecode.CONFIG, extra_config, clear=False):
merge_state = PullRequestModel().merge(
pull_request, user_admin, extras=merge_extras)
assert merge_state.executed
assert 'pre_push' in capture_rcextensions
assert 'post_push' in capture_rcextensions
def test_merge_can_be_rejected_by_pre_push_hook(
self, pr_util, user_admin, capture_rcextensions, merge_extras):
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
# TODO: johbo: Needed for sqlite, try to find an automatic way for it
merge_extras['repository'] = pull_request.target_repo.repo_name
Session().commit()
with mock.patch('rhodecode.EXTENSIONS.PRE_PUSH_HOOK') as pre_pull:
pre_pull.side_effect = RepositoryError("Disallow push!")
merge_status = PullRequestModel().merge(
pull_request, user_admin, extras=merge_extras)
assert not merge_status.executed
assert 'pre_push' not in capture_rcextensions
assert 'post_push' not in capture_rcextensions
def test_merge_fails_if_target_is_locked(
self, pr_util, user_regular, merge_extras):
pull_request = pr_util.create_pull_request(
approved=True, mergeable=True)
locked_by = [user_regular.user_id + 1, 12345.50, 'lock_web']
pull_request.target_repo.locked = locked_by
# TODO: johbo: Check if this can work based on the database, currently
# all data is pre-computed, that's why just updating the DB is not
# enough.
merge_extras['locked_by'] = locked_by
merge_extras['repository'] = pull_request.target_repo.repo_name
# TODO: johbo: Needed for sqlite, try to find an automatic way for it
Session().commit()
merge_status = PullRequestModel().merge(
pull_request, user_regular, extras=merge_extras)
assert not merge_status.executed
@pytest.mark.parametrize('use_outdated, inlines_count, outdated_count', [
(False, 1, 0),
(True, 0, 1),
])
def test_outdated_comments(
pr_util, use_outdated, inlines_count, outdated_count):
pull_request = pr_util.create_pull_request()
pr_util.create_inline_comment(file_path='not_in_updated_diff')
with outdated_comments_patcher(use_outdated) as outdated_comment_mock:
pr_util.add_one_commit()
assert_inline_comments(
pull_request, visible=inlines_count, outdated=outdated_count)
outdated_comment_mock.assert_called_with(pull_request)
@pytest.fixture
def merge_extras(user_regular):
"""
Context for the vcs operation when running a merge.
"""
extras = {
'ip': '127.0.0.1',
'username': user_regular.username,
'action': 'push',
'repository': 'fake_target_repo_name',
'scm': 'git',
'config': 'fake_config_ini_path',
'make_lock': None,
'locked_by': [None, None, None],
'server_url': 'http://test.example.com:5000',
'hooks': ['push', 'pull'],
'is_shadow_repo': False,
}
return extras
class TestUpdateCommentHandling(object):
@pytest.fixture(autouse=True, scope='class')
def enable_outdated_comments(self, request, pylonsapp):
config_patch = mock.patch.dict(
'rhodecode.CONFIG', {'rhodecode_use_outdated_comments': True})
config_patch.start()
@request.addfinalizer
def cleanup():
config_patch.stop()
def test_comment_stays_unflagged_on_unchanged_diff(self, pr_util):
commits = [
{'message': 'a'},
{'message': 'b', 'added': [FileNode('file_b', 'test_content\n')]},
{'message': 'c', 'added': [FileNode('file_c', 'test_content\n')]},
]
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'])
pr_util.create_inline_comment(file_path='file_b')
pr_util.add_one_commit(head='c')
assert_inline_comments(pull_request, visible=1, outdated=0)
def test_comment_stays_unflagged_on_change_above(self, pr_util):
original_content = ''.join(
['line {}\n'.format(x) for x in range(1, 11)])
updated_content = 'new_line_at_top\n' + original_content
commits = [
{'message': 'a'},
{'message': 'b', 'added': [FileNode('file_b', original_content)]},
{'message': 'c', 'changed': [FileNode('file_b', updated_content)]},
]
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'])
with outdated_comments_patcher():
comment = pr_util.create_inline_comment(
line_no=u'n8', file_path='file_b')
pr_util.add_one_commit(head='c')
assert_inline_comments(pull_request, visible=1, outdated=0)
assert comment.line_no == u'n9'
def test_comment_stays_unflagged_on_change_below(self, pr_util):
original_content = ''.join(['line {}\n'.format(x) for x in range(10)])
updated_content = original_content + 'new_line_at_end\n'
commits = [
{'message': 'a'},
{'message': 'b', 'added': [FileNode('file_b', original_content)]},
{'message': 'c', 'changed': [FileNode('file_b', updated_content)]},
]
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'])
pr_util.create_inline_comment(file_path='file_b')
pr_util.add_one_commit(head='c')
assert_inline_comments(pull_request, visible=1, outdated=0)
@pytest.mark.parametrize('line_no', ['n4', 'o4', 'n10', 'o9'])
def test_comment_flagged_on_change_around_context(self, pr_util, line_no):
base_lines = ['line {}\n'.format(x) for x in range(1, 13)]
change_lines = list(base_lines)
change_lines.insert(6, 'line 6a added\n')
# Changes on the last line of sight
update_lines = list(change_lines)
update_lines[0] = 'line 1 changed\n'
update_lines[-1] = 'line 12 changed\n'
def file_b(lines):
return FileNode('file_b', ''.join(lines))
commits = [
{'message': 'a', 'added': [file_b(base_lines)]},
{'message': 'b', 'changed': [file_b(change_lines)]},
{'message': 'c', 'changed': [file_b(update_lines)]},
]
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'])
pr_util.create_inline_comment(line_no=line_no, file_path='file_b')
with outdated_comments_patcher():
pr_util.add_one_commit(head='c')
assert_inline_comments(pull_request, visible=0, outdated=1)
@pytest.mark.parametrize("change, content", [
('changed', 'changed\n'),
('removed', ''),
], ids=['changed', 'removed'])
def test_comment_flagged_on_change(self, pr_util, change, content):
commits = [
{'message': 'a'},
{'message': 'b', 'added': [FileNode('file_b', 'test_content\n')]},
{'message': 'c', change: [FileNode('file_b', content)]},
]
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'])
pr_util.create_inline_comment(file_path='file_b')
with outdated_comments_patcher():
pr_util.add_one_commit(head='c')
assert_inline_comments(pull_request, visible=0, outdated=1)
class TestUpdateChangedFiles(object):
def test_no_changes_on_unchanged_diff(self, pr_util):
commits = [
{'message': 'a'},
{'message': 'b',
'added': [FileNode('file_b', 'test_content b\n')]},
{'message': 'c',
'added': [FileNode('file_c', 'test_content c\n')]},
]
# open a PR from a to b, adding file_b
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'],
name_suffix='per-file-review')
# modify PR adding new file file_c
pr_util.add_one_commit(head='c')
assert_pr_file_changes(
pull_request,
added=['file_c'],
modified=[],
removed=[])
def test_modify_and_undo_modification_diff(self, pr_util):
commits = [
{'message': 'a'},
{'message': 'b',
'added': [FileNode('file_b', 'test_content b\n')]},
{'message': 'c',
'changed': [FileNode('file_b', 'test_content b modified\n')]},
{'message': 'd',
'changed': [FileNode('file_b', 'test_content b\n')]},
]
# open a PR from a to b, adding file_b
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'],
name_suffix='per-file-review')
# modify PR modifying file file_b
pr_util.add_one_commit(head='c')
assert_pr_file_changes(
pull_request,
added=[],
modified=['file_b'],
removed=[])
# move the head again to d, which rollbacks change,
# meaning we should indicate no changes
pr_util.add_one_commit(head='d')
assert_pr_file_changes(
pull_request,
added=[],
modified=[],
removed=[])
def test_updated_all_files_in_pr(self, pr_util):
commits = [
{'message': 'a'},
{'message': 'b', 'added': [
FileNode('file_a', 'test_content a\n'),
FileNode('file_b', 'test_content b\n'),
FileNode('file_c', 'test_content c\n')]},
{'message': 'c', 'changed': [
FileNode('file_a', 'test_content a changed\n'),
FileNode('file_b', 'test_content b changed\n'),
FileNode('file_c', 'test_content c changed\n')]},
]
# open a PR from a to b, changing 3 files
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'],
name_suffix='per-file-review')
pr_util.add_one_commit(head='c')
assert_pr_file_changes(
pull_request,
added=[],
modified=['file_a', 'file_b', 'file_c'],
removed=[])
def test_updated_and_removed_all_files_in_pr(self, pr_util):
commits = [
{'message': 'a'},
{'message': 'b', 'added': [
FileNode('file_a', 'test_content a\n'),
FileNode('file_b', 'test_content b\n'),
FileNode('file_c', 'test_content c\n')]},
{'message': 'c', 'removed': [
FileNode('file_a', 'test_content a changed\n'),
FileNode('file_b', 'test_content b changed\n'),
FileNode('file_c', 'test_content c changed\n')]},
]
# open a PR from a to b, removing 3 files
pull_request = pr_util.create_pull_request(
commits=commits, target_head='a', source_head='b', revisions=['b'],
name_suffix='per-file-review')
pr_util.add_one_commit(head='c')
assert_pr_file_changes(
pull_request,
added=[],
modified=[],
removed=['file_a', 'file_b', 'file_c'])
def test_update_writes_snapshot_into_pull_request_version(pr_util):
model = PullRequestModel()
pull_request = pr_util.create_pull_request()
pr_util.update_source_repository()
model.update_commits(pull_request)
# Expect that it has a version entry now
assert len(model.get_versions(pull_request)) == 1
def test_update_skips_new_version_if_unchanged(pr_util):
pull_request = pr_util.create_pull_request()
model = PullRequestModel()
model.update_commits(pull_request)
# Expect that it still has no versions
assert len(model.get_versions(pull_request)) == 0
def test_update_assigns_comments_to_the_new_version(pr_util):
model = PullRequestModel()
pull_request = pr_util.create_pull_request()
comment = pr_util.create_comment()
pr_util.update_source_repository()
model.update_commits(pull_request)
# Expect that the comment is linked to the pr version now
assert comment.pull_request_version == model.get_versions(pull_request)[0]
def test_update_adds_a_comment_to_the_pull_request_about_the_change(pr_util):
model = PullRequestModel()
pull_request = pr_util.create_pull_request()
pr_util.update_source_repository()
pr_util.update_source_repository()
model.update_commits(pull_request)
# Expect to find a new comment about the change
expected_message = textwrap.dedent(
"""\
Pull request updated. Auto status change to |under_review|
.. role:: added
.. role:: removed
.. parsed-literal::
Changed commits:
* :added:`1 added`
* :removed:`0 removed`
Changed files:
* `A file_2 <#a_c--92ed3b5f07b4>`_
.. |under_review| replace:: *"Under Review"*"""
)
pull_request_comments = sorted(
pull_request.comments, key=lambda c: c.modified_at)
update_comment = pull_request_comments[-1]
assert update_comment.text == expected_message
def test_create_version_from_snapshot_updates_attributes(pr_util):
pull_request = pr_util.create_pull_request()
# Avoiding default values
pull_request.status = PullRequest.STATUS_CLOSED
pull_request._last_merge_source_rev = "0" * 40
pull_request._last_merge_target_rev = "1" * 40
pull_request._last_merge_status = 1
pull_request.merge_rev = "2" * 40
# Remember automatic values
created_on = pull_request.created_on
updated_on = pull_request.updated_on
# Create a new version of the pull request
version = PullRequestModel()._create_version_from_snapshot(pull_request)
# Check attributes
assert version.title == pr_util.create_parameters['title']
assert version.description == pr_util.create_parameters['description']
assert version.status == PullRequest.STATUS_CLOSED
# versions get updated created_on
assert version.created_on != created_on
assert version.updated_on == updated_on
assert version.user_id == pull_request.user_id
assert version.revisions == pr_util.create_parameters['revisions']
assert version.source_repo == pr_util.source_repository
assert version.source_ref == pr_util.create_parameters['source_ref']
assert version.target_repo == pr_util.target_repository
assert version.target_ref == pr_util.create_parameters['target_ref']
assert version._last_merge_source_rev == pull_request._last_merge_source_rev
assert version._last_merge_target_rev == pull_request._last_merge_target_rev
assert version._last_merge_status == pull_request._last_merge_status
assert version.merge_rev == pull_request.merge_rev
assert version.pull_request == pull_request
def test_link_comments_to_version_only_updates_unlinked_comments(pr_util):
version1 = pr_util.create_version_of_pull_request()
comment_linked = pr_util.create_comment(linked_to=version1)
comment_unlinked = pr_util.create_comment()
version2 = pr_util.create_version_of_pull_request()
PullRequestModel()._link_comments_to_version(version2)
# Expect that only the new comment is linked to version2
assert (
comment_unlinked.pull_request_version_id ==
version2.pull_request_version_id)
assert (
comment_linked.pull_request_version_id ==
version1.pull_request_version_id)
assert (
comment_unlinked.pull_request_version_id !=
comment_linked.pull_request_version_id)
def test_calculate_commits():
old_ids = [1, 2, 3]
new_ids = [1, 3, 4, 5]
change = PullRequestModel()._calculate_commit_id_changes(old_ids, new_ids)
assert change.added == [4, 5]
assert change.common == [1, 3]
assert change.removed == [2]
assert change.total == [1, 3, 4, 5]
def assert_inline_comments(pull_request, visible=None, outdated=None):
if visible is not None:
inline_comments = CommentsModel().get_inline_comments(
pull_request.target_repo.repo_id, pull_request=pull_request)
inline_cnt = CommentsModel().get_inline_comments_count(
inline_comments)
assert inline_cnt == visible
if outdated is not None:
outdated_comments = CommentsModel().get_outdated_comments(
pull_request.target_repo.repo_id, pull_request)
assert len(outdated_comments) == outdated
def assert_pr_file_changes(
pull_request, added=None, modified=None, removed=None):
pr_versions = PullRequestModel().get_versions(pull_request)
# always use first version, ie original PR to calculate changes
pull_request_version = pr_versions[0]
old_diff_data, new_diff_data = PullRequestModel()._generate_update_diffs(
pull_request, pull_request_version)
file_changes = PullRequestModel()._calculate_file_changes(
old_diff_data, new_diff_data)
assert added == file_changes.added, \
'expected added:%s vs value:%s' % (added, file_changes.added)
assert modified == file_changes.modified, \
'expected modified:%s vs value:%s' % (modified, file_changes.modified)
assert removed == file_changes.removed, \
'expected removed:%s vs value:%s' % (removed, file_changes.removed)
def outdated_comments_patcher(use_outdated=True):
return mock.patch.object(
CommentsModel, 'use_outdated_comments',
return_value=use_outdated)