##// END OF EJS Templates
tests: fixed test suite for celery adoption
tests: fixed test suite for celery adoption

File last commit:

r5607:39b20522 default
r5607:39b20522 default
Show More
test_pullrequest.py
902 lines | 37.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import mock
import pytest
import textwrap
import rhodecode
from rhodecode.lib.vcs.backends import get_backend
from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason, Reference
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.model.comment import CommentsModel
from rhodecode.model.db import PullRequest, Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.tests import TEST_USER_ADMIN_LOGIN
from rhodecode.lib.str_utils import safe_str
pytestmark = [
pytest.mark.backends("git", "hg"),
]
@pytest.mark.usefixtures("config_stub")
class TestPullRequestModel(object):
@pytest.fixture()
def pull_request(self, request, backend, pr_util):
"""
A pull request combined with multiples patches.
"""
BackendClass = get_backend(backend.alias)
merge_resp = MergeResponse(False, False, None, MergeFailureReason.UNKNOWN, metadata={"exception": "MockError"})
self.merge_patcher = mock.patch.object(BackendClass, "merge", return_value=merge_resp)
self.workspace_remove_patcher = mock.patch.object(BackendClass, "cleanup_merge_workspace")
self.workspace_remove_mock = self.workspace_remove_patcher.start()
self.merge_mock = self.merge_patcher.start()
self.comment_patcher = mock.patch("rhodecode.model.changeset_status.ChangesetStatusModel.set_status")
self.comment_patcher.start()
self.notification_patcher = mock.patch("rhodecode.model.notification.NotificationModel.create")
self.notification_patcher.start()
self.helper_patcher = mock.patch("rhodecode.lib.helpers.route_path")
self.helper_patcher.start()
self.hook_patcher = mock.patch.object(PullRequestModel, "trigger_pull_request_hook")
self.hook_mock = self.hook_patcher.start()
self.invalidation_patcher = mock.patch("rhodecode.model.pull_request.ScmModel.mark_for_invalidation")
self.invalidation_mock = self.invalidation_patcher.start()
self.pull_request = pr_util.create_pull_request(mergeable=True, name_suffix="ąć")
self.source_commit = self.pull_request.source_ref_parts.commit_id
self.target_commit = self.pull_request.target_ref_parts.commit_id
self.workspace_id = f"pr-{self.pull_request.pull_request_id}"
self.repo_id = self.pull_request.target_repo.repo_id
@request.addfinalizer
def cleanup_pull_request():
calls = [mock.call(self.pull_request, self.pull_request.author, "create")]
self.hook_mock.assert_has_calls(calls)
self.workspace_remove_patcher.stop()
self.merge_patcher.stop()
self.comment_patcher.stop()
self.notification_patcher.stop()
self.helper_patcher.stop()
self.hook_patcher.stop()
self.invalidation_patcher.stop()
return self.pull_request
def test_get_all(self, pull_request):
prs = PullRequestModel().get_all(pull_request.target_repo)
assert isinstance(prs, list)
assert len(prs) == 1
def test_count_all(self, pull_request):
pr_count = PullRequestModel().count_all(pull_request.target_repo)
assert pr_count == 1
def test_get_awaiting_review(self, pull_request):
prs = PullRequestModel().get_awaiting_review(pull_request.target_repo)
assert isinstance(prs, list)
assert len(prs) == 1
def test_count_awaiting_review(self, pull_request):
pr_count = PullRequestModel().count_awaiting_review(pull_request.target_repo)
assert pr_count == 1
def test_get_awaiting_my_review(self, pull_request):
PullRequestModel().update_reviewers(
pull_request, [(pull_request.author, ["author"], False, "reviewer", [])], pull_request.author
)
Session().commit()
prs = PullRequestModel().get_awaiting_my_review(
pull_request.target_repo.repo_name, user_id=pull_request.author.user_id
)
assert isinstance(prs, list)
assert len(prs) == 1
def test_count_awaiting_my_review(self, pull_request):
PullRequestModel().update_reviewers(
pull_request, [(pull_request.author, ["author"], False, "reviewer", [])], pull_request.author
)
Session().commit()
pr_count = PullRequestModel().count_awaiting_my_review(
pull_request.target_repo.repo_name, user_id=pull_request.author.user_id
)
assert pr_count == 1
def test_delete_calls_cleanup_merge(self, pull_request):
repo_id = pull_request.target_repo.repo_id
PullRequestModel().delete(pull_request, pull_request.author)
Session().commit()
self.workspace_remove_mock.assert_called_once_with(repo_id, self.workspace_id)
def test_close_calls_cleanup_and_hook(self, pull_request):
PullRequestModel().close_pull_request(pull_request, pull_request.author)
Session().commit()
repo_id = pull_request.target_repo.repo_id
self.workspace_remove_mock.assert_called_once_with(repo_id, self.workspace_id)
self.hook_mock.assert_called_with(self.pull_request, self.pull_request.author, "close")
def test_merge_status(self, pull_request):
self.merge_mock.return_value = MergeResponse(True, False, None, MergeFailureReason.NONE)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request.last_merge_status is None
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is True
assert msg == "This pull request can be automatically merged."
self.merge_mock.assert_called_with(
self.repo_id,
self.workspace_id,
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts,
dry_run=True,
use_rebase=False,
close_branch=False,
)
assert pull_request._last_merge_source_rev == self.source_commit
assert pull_request._last_merge_target_rev == self.target_commit
assert pull_request.last_merge_status is MergeFailureReason.NONE
self.merge_mock.reset_mock()
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is True
assert msg == "This pull request can be automatically merged."
assert self.merge_mock.called is False
def test_merge_status_known_failure(self, pull_request):
self.merge_mock.return_value = MergeResponse(
False, False, None, MergeFailureReason.MERGE_FAILED, metadata={"unresolved_files": "file1"}
)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request.last_merge_status is None
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "This pull request cannot be merged because of merge conflicts. file1"
self.merge_mock.assert_called_with(
self.repo_id,
self.workspace_id,
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts,
dry_run=True,
use_rebase=False,
close_branch=False,
)
assert pull_request._last_merge_source_rev == self.source_commit
assert pull_request._last_merge_target_rev == self.target_commit
assert pull_request.last_merge_status is MergeFailureReason.MERGE_FAILED
self.merge_mock.reset_mock()
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "This pull request cannot be merged because of merge conflicts. file1"
assert self.merge_mock.called is False
def test_merge_status_unknown_failure(self, pull_request):
self.merge_mock.return_value = MergeResponse(
False, False, None, MergeFailureReason.UNKNOWN, metadata={"exception": "MockError"}
)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request.last_merge_status is None
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "This pull request cannot be merged because of an unhandled exception. MockError"
self.merge_mock.assert_called_with(
self.repo_id,
self.workspace_id,
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts,
dry_run=True,
use_rebase=False,
close_branch=False,
)
assert pull_request._last_merge_source_rev is None
assert pull_request._last_merge_target_rev is None
assert pull_request.last_merge_status is None
self.merge_mock.reset_mock()
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "This pull request cannot be merged because of an unhandled exception. MockError"
assert self.merge_mock.called is True
def test_merge_status_when_target_is_locked(self, pull_request):
pull_request.target_repo.locked = [1, "12345.50", "lock_web"]
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "This pull request cannot be merged because the target repository is locked by user:1."
def test_merge_status_requirements_check_target(self, pull_request):
def has_largefiles(self, repo):
return repo == pull_request.source_repo
patcher = mock.patch.object(PullRequestModel, "_has_largefiles", has_largefiles)
with patcher:
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "Target repository large files support is disabled."
def test_merge_status_requirements_check_source(self, pull_request):
def has_largefiles(self, repo):
return repo == pull_request.target_repo
patcher = mock.patch.object(PullRequestModel, "_has_largefiles", has_largefiles)
with patcher:
merge_response, status, msg = PullRequestModel().merge_status(pull_request)
assert status is False
assert msg == "Source repository large files support is disabled."
def test_merge(self, pull_request, merge_extras):
user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
merge_ref = Reference("type", "name", "6126b7bfcc82ad2d3deaee22af926b082ce54cc6")
self.merge_mock.return_value = MergeResponse(True, True, merge_ref, MergeFailureReason.NONE)
merge_extras["repository"] = pull_request.target_repo.repo_name
PullRequestModel().merge_repo(pull_request, pull_request.author, extras=merge_extras)
Session().commit()
message = "Merge pull request !{pr_id} from {source_repo} {source_ref_name}" "\n\n {pr_title}".format(
pr_id=pull_request.pull_request_id,
source_repo=safe_str(pull_request.source_repo.scm_instance().name),
source_ref_name=pull_request.source_ref_parts.name,
pr_title=safe_str(pull_request.title),
)
self.merge_mock.assert_called_with(
self.repo_id,
self.workspace_id,
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts,
user_name=user.short_contact,
user_email=user.email,
message=message,
use_rebase=False,
close_branch=False,
)
self.invalidation_mock.assert_called_once_with(pull_request.target_repo.repo_name)
self.hook_mock.assert_called_with(self.pull_request, self.pull_request.author, "merge")
pull_request = PullRequest.get(pull_request.pull_request_id)
assert pull_request.merge_rev == "6126b7bfcc82ad2d3deaee22af926b082ce54cc6"
def test_merge_with_status_lock(self, pull_request, merge_extras):
user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
merge_ref = Reference("type", "name", "6126b7bfcc82ad2d3deaee22af926b082ce54cc6")
self.merge_mock.return_value = MergeResponse(True, True, merge_ref, MergeFailureReason.NONE)
merge_extras["repository"] = pull_request.target_repo.repo_name
with pull_request.set_state(PullRequest.STATE_UPDATING):
assert pull_request.pull_request_state == PullRequest.STATE_UPDATING
PullRequestModel().merge_repo(pull_request, pull_request.author, extras=merge_extras)
Session().commit()
assert pull_request.pull_request_state == PullRequest.STATE_CREATED
message = "Merge pull request !{pr_id} from {source_repo} {source_ref_name}" "\n\n {pr_title}".format(
pr_id=pull_request.pull_request_id,
source_repo=safe_str(pull_request.source_repo.scm_instance().name),
source_ref_name=pull_request.source_ref_parts.name,
pr_title=safe_str(pull_request.title),
)
self.merge_mock.assert_called_with(
self.repo_id,
self.workspace_id,
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts,
user_name=user.short_contact,
user_email=user.email,
message=message,
use_rebase=False,
close_branch=False,
)
self.invalidation_mock.assert_called_once_with(pull_request.target_repo.repo_name)
self.hook_mock.assert_called_with(self.pull_request, self.pull_request.author, "merge")
pull_request = PullRequest.get(pull_request.pull_request_id)
assert pull_request.merge_rev == "6126b7bfcc82ad2d3deaee22af926b082ce54cc6"
def test_merge_failed(self, pull_request, merge_extras):
user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
merge_ref = Reference("type", "name", "6126b7bfcc82ad2d3deaee22af926b082ce54cc6")
self.merge_mock.return_value = MergeResponse(False, False, merge_ref, MergeFailureReason.MERGE_FAILED)
merge_extras["repository"] = pull_request.target_repo.repo_name
PullRequestModel().merge_repo(pull_request, pull_request.author, extras=merge_extras)
Session().commit()
message = "Merge pull request !{pr_id} from {source_repo} {source_ref_name}" "\n\n {pr_title}".format(
pr_id=pull_request.pull_request_id,
source_repo=safe_str(pull_request.source_repo.scm_instance().name),
source_ref_name=pull_request.source_ref_parts.name,
pr_title=safe_str(pull_request.title),
)
self.merge_mock.assert_called_with(
self.repo_id,
self.workspace_id,
pull_request.target_ref_parts,
pull_request.source_repo.scm_instance(),
pull_request.source_ref_parts,
user_name=user.short_contact,
user_email=user.email,
message=message,
use_rebase=False,
close_branch=False,
)
pull_request = PullRequest.get(pull_request.pull_request_id)
assert self.invalidation_mock.called is False
assert pull_request.merge_rev is None
def test_get_commit_ids(self, pull_request):
# The PR has been not merged yet, so expect an exception
with pytest.raises(ValueError):
PullRequestModel()._get_commit_ids(pull_request)
# Merge revision is in the revisions list
pull_request.merge_rev = pull_request.revisions[0]
commit_ids = PullRequestModel()._get_commit_ids(pull_request)
assert commit_ids == pull_request.revisions
# Merge revision is not in the revisions list
pull_request.merge_rev = "f000" * 10
commit_ids = PullRequestModel()._get_commit_ids(pull_request)
assert commit_ids == pull_request.revisions + [pull_request.merge_rev]
def test_get_diff_from_pr_version(self, pull_request):
source_repo = pull_request.source_repo
source_ref_id = pull_request.source_ref_parts.commit_id
target_ref_id = pull_request.target_ref_parts.commit_id
diff = PullRequestModel()._get_diff_from_pr_or_version(
source_repo, source_ref_id, target_ref_id, hide_whitespace_changes=False, diff_context=6
)
assert b"file_1" in diff.raw.tobytes()
def test_generate_title_returns_unicode(self):
title = PullRequestModel().generate_pullrequest_title(
source="source-dummy",
source_ref="source-ref-dummy",
target="target-dummy",
)
assert type(title) == str
@pytest.mark.parametrize(
"title, has_wip",
[
("hello", False),
("hello wip", False),
("hello wip: xxx", False),
("[wip] hello", True),
("[wip] hello", True),
("wip: hello", True),
("wip hello", True),
],
)
def test_wip_title_marker(self, pull_request, title, has_wip):
pull_request.title = title
assert pull_request.work_in_progress == has_wip
@pytest.mark.usefixtures("config_stub")
class TestIntegrationMerge(object):
def test_merge_fails_if_target_is_locked(self, pr_util, user_regular, merge_extras):
pull_request = pr_util.create_pull_request(approved=True, mergeable=True)
locked_by = [user_regular.user_id + 1, 12345.50, "lock_web"]
pull_request.target_repo.locked = locked_by
# TODO: johbo: Check if this can work based on the database, currently
# all data is pre-computed, that's why just updating the DB is not
# enough.
merge_extras["locked_by"] = locked_by
merge_extras["repository"] = pull_request.target_repo.repo_name
# TODO: johbo: Needed for sqlite, try to find an automatic way for it
Session().commit()
merge_status = PullRequestModel().merge_repo(pull_request, user_regular, extras=merge_extras)
Session().commit()
assert not merge_status.executed
@pytest.mark.parametrize(
"use_outdated, inlines_count, outdated_count",
[
(False, 1, 0),
(True, 0, 1),
],
)
def test_outdated_comments(pr_util, use_outdated, inlines_count, outdated_count, config_stub):
pull_request = pr_util.create_pull_request()
pr_util.create_inline_comment(file_path="not_in_updated_diff")
with outdated_comments_patcher(use_outdated) as outdated_comment_mock:
pr_util.add_one_commit()
assert_inline_comments(pull_request, visible=inlines_count, outdated=outdated_count)
outdated_comment_mock.assert_called_with(pull_request)
@pytest.mark.parametrize(
"mr_type, expected_msg",
[
(MergeFailureReason.NONE, "This pull request can be automatically merged."),
(MergeFailureReason.UNKNOWN, "This pull request cannot be merged because of an unhandled exception. CRASH"),
(
MergeFailureReason.MERGE_FAILED,
"This pull request cannot be merged because of merge conflicts. CONFLICT_FILE",
),
(
MergeFailureReason.PUSH_FAILED,
"This pull request could not be merged because push to target:`some-repo@merge_commit` failed.",
),
(
MergeFailureReason.TARGET_IS_NOT_HEAD,
"This pull request cannot be merged because the target `ref_name` is not a head.",
),
(
MergeFailureReason.HG_SOURCE_HAS_MORE_BRANCHES,
"This pull request cannot be merged because the source contains more branches than the target.",
),
(
MergeFailureReason.HG_TARGET_HAS_MULTIPLE_HEADS,
"This pull request cannot be merged because the target `ref_name` has multiple heads: `a,b,c`.",
),
(
MergeFailureReason.TARGET_IS_LOCKED,
"This pull request cannot be merged because the target repository is locked by user:123.",
),
(
MergeFailureReason.MISSING_TARGET_REF,
"This pull request cannot be merged because the target reference `ref_name` is missing.",
),
(
MergeFailureReason.MISSING_SOURCE_REF,
"This pull request cannot be merged because the source reference `ref_name` is missing.",
),
(
MergeFailureReason.SUBREPO_MERGE_FAILED,
"This pull request cannot be merged because of conflicts related to sub repositories.",
),
],
)
def test_merge_response_message(mr_type, expected_msg):
merge_ref = Reference("type", "ref_name", "6126b7bfcc82ad2d3deaee22af926b082ce54cc6")
metadata = {
"unresolved_files": "CONFLICT_FILE",
"exception": "CRASH",
"target": "some-repo",
"merge_commit": "merge_commit",
"target_ref": merge_ref,
"source_ref": merge_ref,
"heads": ",".join(["a", "b", "c"]),
"locked_by": "user:123",
}
merge_response = MergeResponse(True, True, merge_ref, mr_type, metadata=metadata)
assert merge_response.merge_status_message == expected_msg
@pytest.fixture()
def merge_extras(request, user_regular):
"""
Context for the vcs operation when running a merge.
"""
extras = {
"ip": "127.0.0.1",
"username": user_regular.username,
"user_id": user_regular.user_id,
"action": "push",
"repository": "fake_target_repo_name",
"scm": "git",
"config": request.config.getini("pyramid_config"),
"repo_store": "",
"make_lock": None,
"locked_by": [None, None, None],
"server_url": "http://test.example.com:5000",
"hooks": ["push", "pull"],
"is_shadow_repo": False,
}
return extras
@pytest.mark.usefixtures("config_stub")
class TestUpdateCommentHandling(object):
@pytest.fixture(autouse=True, scope="class")
def enable_outdated_comments(self, request, baseapp):
config_patch = mock.patch.dict("rhodecode.CONFIG", {"rhodecode_use_outdated_comments": True})
config_patch.start()
@request.addfinalizer
def cleanup():
config_patch.stop()
def test_comment_stays_unflagged_on_unchanged_diff(self, pr_util):
commits = [
{"message": "a"},
{"message": "b", "added": [FileNode(b"file_b", b"test_content\n")]},
{"message": "c", "added": [FileNode(b"file_c", b"test_content\n")]},
]
pull_request = pr_util.create_pull_request(commits=commits, target_head="a", source_head="b", revisions=["b"])
pr_util.create_inline_comment(file_path="file_b")
pr_util.add_one_commit(head="c")
assert_inline_comments(pull_request, visible=1, outdated=0)
def test_comment_stays_unflagged_on_change_above(self, pr_util):
original_content = b"".join((b"line %d\n" % x for x in range(1, 11)))
updated_content = b"new_line_at_top\n" + original_content
commits = [
{"message": "a"},
{"message": "b", "added": [FileNode(b"file_b", original_content)]},
{"message": "c", "changed": [FileNode(b"file_b", updated_content)]},
]
pull_request = pr_util.create_pull_request(commits=commits, target_head="a", source_head="b", revisions=["b"])
with outdated_comments_patcher():
comment = pr_util.create_inline_comment(line_no="n8", file_path="file_b")
pr_util.add_one_commit(head="c")
assert_inline_comments(pull_request, visible=1, outdated=0)
assert comment.line_no == "n9"
def test_comment_stays_unflagged_on_change_below(self, pr_util):
original_content = b"".join([b"line %d\n" % x for x in range(10)])
updated_content = original_content + b"new_line_at_end\n"
commits = [
{"message": "a"},
{"message": "b", "added": [FileNode(b"file_b", original_content)]},
{"message": "c", "changed": [FileNode(b"file_b", updated_content)]},
]
pull_request = pr_util.create_pull_request(commits=commits, target_head="a", source_head="b", revisions=["b"])
pr_util.create_inline_comment(file_path="file_b")
pr_util.add_one_commit(head="c")
assert_inline_comments(pull_request, visible=1, outdated=0)
@pytest.mark.parametrize("line_no", ["n4", "o4", "n10", "o9"])
def test_comment_flagged_on_change_around_context(self, pr_util, line_no):
base_lines = [b"line %d\n" % x for x in range(1, 13)]
change_lines = list(base_lines)
change_lines.insert(6, b"line 6a added\n")
# Changes on the last line of sight
update_lines = list(change_lines)
update_lines[0] = b"line 1 changed\n"
update_lines[-1] = b"line 12 changed\n"
def file_b(lines):
return FileNode(b"file_b", b"".join(lines))
commits = [
{"message": "a", "added": [file_b(base_lines)]},
{"message": "b", "changed": [file_b(change_lines)]},
{"message": "c", "changed": [file_b(update_lines)]},
]
pull_request = pr_util.create_pull_request(commits=commits, target_head="a", source_head="b", revisions=["b"])
pr_util.create_inline_comment(line_no=line_no, file_path="file_b")
with outdated_comments_patcher():
pr_util.add_one_commit(head="c")
assert_inline_comments(pull_request, visible=0, outdated=1)
@pytest.mark.parametrize(
"change, content",
[
("changed", b"changed\n"),
("removed", b""),
],
ids=["changed", b"removed"],
)
def test_comment_flagged_on_change(self, pr_util, change, content):
commits = [
{"message": "a"},
{"message": "b", "added": [FileNode(b"file_b", b"test_content\n")]},
{"message": "c", change: [FileNode(b"file_b", content)]},
]
pull_request = pr_util.create_pull_request(commits=commits, target_head="a", source_head="b", revisions=["b"])
pr_util.create_inline_comment(file_path="file_b")
with outdated_comments_patcher():
pr_util.add_one_commit(head="c")
assert_inline_comments(pull_request, visible=0, outdated=1)
@pytest.mark.usefixtures("config_stub")
class TestUpdateChangedFiles(object):
def test_no_changes_on_unchanged_diff(self, pr_util):
commits = [
{"message": "a"},
{"message": "b", "added": [FileNode(b"file_b", b"test_content b\n")]},
{"message": "c", "added": [FileNode(b"file_c", b"test_content c\n")]},
]
# open a PR from a to b, adding file_b
pull_request = pr_util.create_pull_request(
commits=commits, target_head="a", source_head="b", revisions=["b"], name_suffix="per-file-review"
)
# modify PR adding new file file_c
pr_util.add_one_commit(head="c")
assert_pr_file_changes(pull_request, added=["file_c"], modified=[], removed=[])
def test_modify_and_undo_modification_diff(self, pr_util):
commits = [
{"message": "a"},
{"message": "b", "added": [FileNode(b"file_b", b"test_content b\n")]},
{"message": "c", "changed": [FileNode(b"file_b", b"test_content b modified\n")]},
{"message": "d", "changed": [FileNode(b"file_b", b"test_content b\n")]},
]
# open a PR from a to b, adding file_b
pull_request = pr_util.create_pull_request(
commits=commits, target_head="a", source_head="b", revisions=["b"], name_suffix="per-file-review"
)
# modify PR modifying file file_b
pr_util.add_one_commit(head="c")
assert_pr_file_changes(pull_request, added=[], modified=["file_b"], removed=[])
# move the head again to d, which rollbacks change,
# meaning we should indicate no changes
pr_util.add_one_commit(head="d")
assert_pr_file_changes(pull_request, added=[], modified=[], removed=[])
def test_updated_all_files_in_pr(self, pr_util):
commits = [
{"message": "a"},
{
"message": "b",
"added": [
FileNode(b"file_a", b"test_content a\n"),
FileNode(b"file_b", b"test_content b\n"),
FileNode(b"file_c", b"test_content c\n"),
],
},
{
"message": "c",
"changed": [
FileNode(b"file_a", b"test_content a changed\n"),
FileNode(b"file_b", b"test_content b changed\n"),
FileNode(b"file_c", b"test_content c changed\n"),
],
},
]
# open a PR from a to b, changing 3 files
pull_request = pr_util.create_pull_request(
commits=commits, target_head="a", source_head="b", revisions=["b"], name_suffix="per-file-review"
)
pr_util.add_one_commit(head="c")
assert_pr_file_changes(pull_request, added=[], modified=["file_a", "file_b", "file_c"], removed=[])
def test_updated_and_removed_all_files_in_pr(self, pr_util):
commits = [
{"message": "a"},
{
"message": "b",
"added": [
FileNode(b"file_a", b"test_content a\n"),
FileNode(b"file_b", b"test_content b\n"),
FileNode(b"file_c", b"test_content c\n"),
],
},
{
"message": "c",
"removed": [
FileNode(b"file_a", b"test_content a changed\n"),
FileNode(b"file_b", b"test_content b changed\n"),
FileNode(b"file_c", b"test_content c changed\n"),
],
},
]
# open a PR from a to b, removing 3 files
pull_request = pr_util.create_pull_request(
commits=commits, target_head="a", source_head="b", revisions=["b"], name_suffix="per-file-review"
)
pr_util.add_one_commit(head="c")
assert_pr_file_changes(pull_request, added=[], modified=[], removed=["file_a", "file_b", "file_c"])
def test_update_writes_snapshot_into_pull_request_version(pr_util, config_stub):
model = PullRequestModel()
pull_request = pr_util.create_pull_request()
pr_util.update_source_repository()
model.update_commits(pull_request, pull_request.author)
# Expect that it has a version entry now
assert len(model.get_versions(pull_request)) == 1
def test_update_skips_new_version_if_unchanged(pr_util, config_stub):
pull_request = pr_util.create_pull_request()
model = PullRequestModel()
model.update_commits(pull_request, pull_request.author)
# Expect that it still has no versions
assert len(model.get_versions(pull_request)) == 0
def test_update_assigns_comments_to_the_new_version(pr_util, config_stub):
model = PullRequestModel()
pull_request = pr_util.create_pull_request()
comment = pr_util.create_comment()
pr_util.update_source_repository()
model.update_commits(pull_request, pull_request.author)
# Expect that the comment is linked to the pr version now
assert comment.pull_request_version == model.get_versions(pull_request)[0]
def test_update_adds_a_comment_to_the_pull_request_about_the_change(pr_util, config_stub):
model = PullRequestModel()
pull_request = pr_util.create_pull_request()
pr_util.update_source_repository()
pr_util.update_source_repository()
update_response = model.update_commits(pull_request, pull_request.author)
commit_id = update_response.common_ancestor_id
# Expect to find a new comment about the change
expected_message = textwrap.dedent(
"""\
Pull request updated. Auto status change to |under_review|
.. role:: added
.. role:: removed
.. parsed-literal::
Changed commits:
* :added:`1 added`
* :removed:`0 removed`
Changed files:
* `A file_2 <#a_c-{}-92ed3b5f07b4>`_
.. |under_review| replace:: *"Under Review"*"""
).format(commit_id[:12])
pull_request_comments = sorted(pull_request.comments, key=lambda c: c.modified_at)
update_comment = pull_request_comments[-1]
assert update_comment.text == expected_message
def test_create_version_from_snapshot_updates_attributes(pr_util, config_stub):
pull_request = pr_util.create_pull_request()
# Avoiding default values
pull_request.status = PullRequest.STATUS_CLOSED
pull_request._last_merge_source_rev = "0" * 40
pull_request._last_merge_target_rev = "1" * 40
pull_request.last_merge_status = 1
pull_request.merge_rev = "2" * 40
# Remember automatic values
created_on = pull_request.created_on
updated_on = pull_request.updated_on
# Create a new version of the pull request
version = PullRequestModel()._create_version_from_snapshot(pull_request)
# Check attributes
assert version.title == pr_util.create_parameters["title"]
assert version.description == pr_util.create_parameters["description"]
assert version.status == PullRequest.STATUS_CLOSED
# versions get updated created_on
assert version.created_on != created_on
assert version.updated_on == updated_on
assert version.user_id == pull_request.user_id
assert version.revisions == pr_util.create_parameters["revisions"]
assert version.source_repo == pr_util.source_repository
assert version.source_ref == pr_util.create_parameters["source_ref"]
assert version.target_repo == pr_util.target_repository
assert version.target_ref == pr_util.create_parameters["target_ref"]
assert version._last_merge_source_rev == pull_request._last_merge_source_rev
assert version._last_merge_target_rev == pull_request._last_merge_target_rev
assert version.last_merge_status == pull_request.last_merge_status
assert version.merge_rev == pull_request.merge_rev
assert version.pull_request == pull_request
def test_link_comments_to_version_only_updates_unlinked_comments(pr_util, config_stub):
version1 = pr_util.create_version_of_pull_request()
comment_linked = pr_util.create_comment(linked_to=version1)
comment_unlinked = pr_util.create_comment()
version2 = pr_util.create_version_of_pull_request()
PullRequestModel()._link_comments_to_version(version2)
Session().commit()
# Expect that only the new comment is linked to version2
assert comment_unlinked.pull_request_version_id == version2.pull_request_version_id
assert comment_linked.pull_request_version_id == version1.pull_request_version_id
assert comment_unlinked.pull_request_version_id != comment_linked.pull_request_version_id
def test_calculate_commits():
old_ids = [1, 2, 3]
new_ids = [1, 3, 4, 5]
change = PullRequestModel()._calculate_commit_id_changes(old_ids, new_ids)
assert change.added == [4, 5]
assert change.common == [1, 3]
assert change.removed == [2]
assert change.total == [1, 3, 4, 5]
def assert_inline_comments(pull_request, visible=None, outdated=None):
if visible is not None:
inline_comments = CommentsModel().get_inline_comments(
pull_request.target_repo.repo_id, pull_request=pull_request
)
inline_cnt = len(CommentsModel().get_inline_comments_as_list(inline_comments))
assert inline_cnt == visible
if outdated is not None:
outdated_comments = CommentsModel().get_outdated_comments(pull_request.target_repo.repo_id, pull_request)
assert len(outdated_comments) == outdated
def assert_pr_file_changes(pull_request, added=None, modified=None, removed=None):
pr_versions = PullRequestModel().get_versions(pull_request)
# always use first version, ie original PR to calculate changes
pull_request_version = pr_versions[0]
old_diff_data, new_diff_data = PullRequestModel()._generate_update_diffs(pull_request, pull_request_version)
file_changes = PullRequestModel()._calculate_file_changes(old_diff_data, new_diff_data)
assert added == file_changes.added, "expected added:%s vs value:%s" % (added, file_changes.added)
assert modified == file_changes.modified, "expected modified:%s vs value:%s" % (modified, file_changes.modified)
assert removed == file_changes.removed, "expected removed:%s vs value:%s" % (removed, file_changes.removed)
def outdated_comments_patcher(use_outdated=True):
return mock.patch.object(CommentsModel, "use_outdated_comments", return_value=use_outdated)