diff --git a/rhodecode/api/tests/test_create_pull_request.py b/rhodecode/api/tests/test_create_pull_request.py
--- a/rhodecode/api/tests/test_create_pull_request.py
+++ b/rhodecode/api/tests/test_create_pull_request.py
@@ -342,7 +342,7 @@ class TestCreatePullRequestApi(object):
commits = [
{'message': 'initial'},
{'message': 'change'},
- {'message': 'new-feature', 'parents': ['initial']},
+ {'message': 'new-feature', 'parents': ['initial'], 'branch': 'feature'},
]
self.commit_ids = backend.create_master_repo(commits)
self.source = backend.create_repo(heads=[source_head])
diff --git a/rhodecode/apps/repository/tests/test_repo_changelog.py b/rhodecode/apps/repository/tests/test_repo_changelog.py
--- a/rhodecode/apps/repository/tests/test_repo_changelog.py
+++ b/rhodecode/apps/repository/tests/test_repo_changelog.py
@@ -101,9 +101,9 @@ class TestChangelogController(TestContro
# although this is a parent of commit "b1". And branch "b" has commits
# which have a smaller index than commit "a1".
commits = [
- {'message': 'a'},
+ {'message': 'a', 'branch': 'master'},
{'message': 'b', 'branch': 'b'},
- {'message': 'a1', 'parents': ['a']},
+ {'message': 'a1', 'parents': ['a'], 'branch': 'master'},
{'message': 'b1', 'branch': 'b', 'parents': ['b', 'a1']},
]
backend.create_repo(commits)
diff --git a/rhodecode/apps/repository/tests/test_repo_compare.py b/rhodecode/apps/repository/tests/test_repo_compare.py
--- a/rhodecode/apps/repository/tests/test_repo_compare.py
+++ b/rhodecode/apps/repository/tests/test_repo_compare.py
@@ -70,11 +70,11 @@ class TestCompareView(object):
commit3 = commit_change(
fork.repo_name, filename=b'file1', content=b'D',
- message='D, child of A', vcs_type=backend.alias, parent=commit0)
+ message='D, child of A', vcs_type=backend.alias, parent=commit0, branch='feature')
commit4 = commit_change(
fork.repo_name, filename=b'file1', content=b'E',
- message='E, child of D', vcs_type=backend.alias, parent=commit3)
+ message='E, child of D', vcs_type=backend.alias, parent=commit3, branch='feature')
# prepare origin repository, taking just the history up to D
diff --git a/rhodecode/apps/repository/tests/test_repo_pullrequests.py b/rhodecode/apps/repository/tests/test_repo_pullrequests.py
--- a/rhodecode/apps/repository/tests/test_repo_pullrequests.py
+++ b/rhodecode/apps/repository/tests/test_repo_pullrequests.py
@@ -1,4 +1,3 @@
-
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
@@ -21,57 +20,73 @@ import pytest
import rhodecode
from rhodecode.lib import helpers as h
-from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason
+from rhodecode.lib.vcs.backends.base import MergeResponse, MergeFailureReason, Reference
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.lib.ext_json import json
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import (
- PullRequest, ChangesetStatus, UserLog, Notification, ChangesetComment, Repository)
+ PullRequest,
+ ChangesetStatus,
+ UserLog,
+ Notification,
+ ChangesetComment,
+ Repository,
+)
from rhodecode.model.meta import Session
from rhodecode.model.pull_request import PullRequestModel
from rhodecode.model.user import UserModel
from rhodecode.model.comment import CommentsModel
from rhodecode.tests import (
- assert_session_flash, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN)
+ assert_session_flash,
+ TEST_USER_ADMIN_LOGIN,
+ TEST_USER_REGULAR_LOGIN,
+)
+from rhodecode.tests.fixture_mods.fixture_utils import PRTestUtility
from rhodecode.tests.routes import route_path
-@pytest.mark.usefixtures('app', 'autologin_user')
+@pytest.mark.usefixtures("app", "autologin_user")
@pytest.mark.backends("git", "hg")
class TestPullrequestsView(object):
-
def test_index(self, backend):
- self.app.get(route_path(
- 'pullrequest_new',
- repo_name=backend.repo_name))
+ self.app.get(route_path("pullrequest_new", repo_name=backend.repo_name))
def test_option_menu_create_pull_request_exists(self, backend):
repo_name = backend.repo_name
- response = self.app.get(h.route_path('repo_summary', repo_name=repo_name))
+ response = self.app.get(h.route_path("repo_summary", repo_name=repo_name))
create_pr_link = 'Create Pull Request' % route_path(
- 'pullrequest_new', repo_name=repo_name)
+ "pullrequest_new", repo_name=repo_name
+ )
response.mustcontain(create_pr_link)
def test_create_pr_form_with_raw_commit_id(self, backend):
repo = backend.repo
self.app.get(
- route_path('pullrequest_new', repo_name=repo.repo_name,
- commit=repo.get_commit().raw_id),
- status=200)
+ route_path(
+ "pullrequest_new",
+ repo_name=repo.repo_name,
+ commit=repo.get_commit().raw_id,
+ ),
+ status=200,
+ )
- @pytest.mark.parametrize('pr_merge_enabled', [True, False])
- @pytest.mark.parametrize('range_diff', ["0", "1"])
+ @pytest.mark.parametrize("pr_merge_enabled", [True, False])
+ @pytest.mark.parametrize("range_diff", ["0", "1"])
def test_show(self, pr_util, pr_merge_enabled, range_diff):
pull_request = pr_util.create_pull_request(
- mergeable=pr_merge_enabled, enable_notifications=False)
+ mergeable=pr_merge_enabled, enable_notifications=False
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id,
- params={'range-diff': range_diff}))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ params={"range-diff": range_diff},
+ )
+ )
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
@@ -82,137 +97,168 @@ class TestPullrequestsView(object):
response.mustcontain('class="pull-request-merge"')
if pr_merge_enabled:
- response.mustcontain('Pull request reviewer approval is pending')
+ response.mustcontain("Pull request reviewer approval is pending")
else:
- response.mustcontain('Server-side pull request merging is disabled.')
+ response.mustcontain("Server-side pull request merging is disabled.")
if range_diff == "1":
- response.mustcontain('Turn off: Show the diff as commit range')
+ response.mustcontain("Turn off: Show the diff as commit range")
def test_show_versions_of_pr(self, backend, csrf_token):
commits = [
- {'message': 'initial-commit',
- 'added': [FileNode(b'test-file.txt', b'LINE1\n')]},
-
- {'message': 'commit-1',
- 'changed': [FileNode(b'test-file.txt', b'LINE1\nLINE2\n')]},
+ {
+ "message": "initial-commit",
+ "added": [FileNode(b"test-file.txt", b"LINE1\n")],
+ },
+ {
+ "message": "commit-1",
+ "changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\n")],
+ },
# Above is the initial version of PR that changes a single line
-
# from now on we'll add 3x commit adding a nother line on each step
- {'message': 'commit-2',
- 'changed': [FileNode(b'test-file.txt', b'LINE1\nLINE2\nLINE3\n')]},
-
- {'message': 'commit-3',
- 'changed': [FileNode(b'test-file.txt', b'LINE1\nLINE2\nLINE3\nLINE4\n')]},
-
- {'message': 'commit-4',
- 'changed': [FileNode(b'test-file.txt', b'LINE1\nLINE2\nLINE3\nLINE4\nLINE5\n')]},
+ {
+ "message": "commit-2",
+ "changed": [FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\n")],
+ },
+ {
+ "message": "commit-3",
+ "changed": [
+ FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\n")
+ ],
+ },
+ {
+ "message": "commit-4",
+ "changed": [
+ FileNode(b"test-file.txt", b"LINE1\nLINE2\nLINE3\nLINE4\nLINE5\n")
+ ],
+ },
]
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['initial-commit'])
- source = backend.create_repo(heads=['commit-1'])
+ target = backend.create_repo(heads=["initial-commit"])
+ source = backend.create_repo(heads=["commit-1"])
source_repo_name = source.repo_name
target_repo_name = target.repo_name
- target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['initial-commit'])
- source_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['commit-1'])
+ target_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["initial-commit"]
+ )
+ source_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["commit-1"]
+ )
response = self.app.post(
- route_path('pullrequest_create', repo_name=source.repo_name),
+ route_path("pullrequest_create", repo_name=source.repo_name),
[
- ('source_repo', source_repo_name),
- ('source_ref', source_ref),
- ('target_repo', target_repo_name),
- ('target_ref', target_ref),
- ('common_ancestor', commit_ids['initial-commit']),
- ('pullrequest_title', 'Title'),
- ('pullrequest_desc', 'Description'),
- ('description_renderer', 'markdown'),
- ('__start__', 'review_members:sequence'),
- ('__start__', 'reviewer:mapping'),
- ('user_id', '1'),
- ('__start__', 'reasons:sequence'),
- ('reason', 'Some reason'),
- ('__end__', 'reasons:sequence'),
- ('__start__', 'rules:sequence'),
- ('__end__', 'rules:sequence'),
- ('mandatory', 'False'),
- ('__end__', 'reviewer:mapping'),
- ('__end__', 'review_members:sequence'),
- ('__start__', 'revisions:sequence'),
- ('revisions', commit_ids['commit-1']),
- ('__end__', 'revisions:sequence'),
- ('user', ''),
- ('csrf_token', csrf_token),
+ ("source_repo", source_repo_name),
+ ("source_ref", source_ref),
+ ("target_repo", target_repo_name),
+ ("target_ref", target_ref),
+ ("common_ancestor", commit_ids["initial-commit"]),
+ ("pullrequest_title", "Title"),
+ ("pullrequest_desc", "Description"),
+ ("description_renderer", "markdown"),
+ ("__start__", "review_members:sequence"),
+ ("__start__", "reviewer:mapping"),
+ ("user_id", "1"),
+ ("__start__", "reasons:sequence"),
+ ("reason", "Some reason"),
+ ("__end__", "reasons:sequence"),
+ ("__start__", "rules:sequence"),
+ ("__end__", "rules:sequence"),
+ ("mandatory", "False"),
+ ("__end__", "reviewer:mapping"),
+ ("__end__", "review_members:sequence"),
+ ("__start__", "revisions:sequence"),
+ ("revisions", commit_ids["commit-1"]),
+ ("__end__", "revisions:sequence"),
+ ("user", ""),
+ ("csrf_token", csrf_token),
],
- status=302)
+ status=302,
+ )
- location = response.headers['Location']
+ location = response.headers["Location"]
- pull_request_id = location.rsplit('/', 1)[1]
- assert pull_request_id != 'new'
+ pull_request_id = location.rsplit("/", 1)[1]
+ assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
pull_request_id = pull_request.pull_request_id
# Show initial version of PR
response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id))
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ )
+ )
- response.mustcontain('commit-1')
- response.mustcontain(no=['commit-2'])
- response.mustcontain(no=['commit-3'])
- response.mustcontain(no=['commit-4'])
+ response.mustcontain("commit-1")
+ response.mustcontain(no=["commit-2"])
+ response.mustcontain(no=["commit-3"])
+ response.mustcontain(no=["commit-4"])
response.mustcontain('cb-addition">LINE2')
- response.mustcontain(no=['LINE3'])
- response.mustcontain(no=['LINE4'])
- response.mustcontain(no=['LINE5'])
+ response.mustcontain(no=["LINE3"])
+ response.mustcontain(no=["LINE4"])
+ response.mustcontain(no=["LINE5"])
# update PR #1
source_repo = Repository.get_by_repo_name(source_repo_name)
- backend.pull_heads(source_repo, heads=['commit-2'])
+ backend.pull_heads(source_repo, heads=["commit-2"])
response = self.app.post(
- route_path('pullrequest_update',
- repo_name=target_repo_name, pull_request_id=pull_request_id),
- params={'update_commits': 'true', 'csrf_token': csrf_token})
+ route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"update_commits": "true", "csrf_token": csrf_token},
+ )
# update PR #2
source_repo = Repository.get_by_repo_name(source_repo_name)
- backend.pull_heads(source_repo, heads=['commit-3'])
+ backend.pull_heads(source_repo, heads=["commit-3"])
response = self.app.post(
- route_path('pullrequest_update',
- repo_name=target_repo_name, pull_request_id=pull_request_id),
- params={'update_commits': 'true', 'csrf_token': csrf_token})
+ route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"update_commits": "true", "csrf_token": csrf_token},
+ )
# update PR #3
source_repo = Repository.get_by_repo_name(source_repo_name)
- backend.pull_heads(source_repo, heads=['commit-4'])
+ backend.pull_heads(source_repo, heads=["commit-4"])
response = self.app.post(
- route_path('pullrequest_update',
- repo_name=target_repo_name, pull_request_id=pull_request_id),
- params={'update_commits': 'true', 'csrf_token': csrf_token})
+ route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"update_commits": "true", "csrf_token": csrf_token},
+ )
# Show final version !
response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id))
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ )
+ )
# 3 updates, and the latest == 4
- response.mustcontain('4 versions available for this pull request')
- response.mustcontain(no=['rhodecode diff rendering error'])
+ response.mustcontain("4 versions available for this pull request")
+ response.mustcontain(no=["rhodecode diff rendering error"])
# initial show must have 3 commits, and 3 adds
- response.mustcontain('commit-1')
- response.mustcontain('commit-2')
- response.mustcontain('commit-3')
- response.mustcontain('commit-4')
+ response.mustcontain("commit-1")
+ response.mustcontain("commit-2")
+ response.mustcontain("commit-3")
+ response.mustcontain("commit-4")
response.mustcontain('cb-addition">LINE2')
response.mustcontain('cb-addition">LINE3')
@@ -229,32 +275,36 @@ class TestPullrequestsView(object):
return 'cb-addition">{}'.format(text)
def cb_context(text):
- return '' \
- '{}'.format(text)
+ return (
+ ''
+ "{}".format(text)
+ )
commit_tests = {
# in response, not in response
- 1: (['commit-1'], ['commit-2', 'commit-3', 'commit-4']),
- 2: (['commit-1', 'commit-2'], ['commit-3', 'commit-4']),
- 3: (['commit-1', 'commit-2', 'commit-3'], ['commit-4']),
- 4: (['commit-1', 'commit-2', 'commit-3', 'commit-4'], []),
+ 1: (["commit-1"], ["commit-2", "commit-3", "commit-4"]),
+ 2: (["commit-1", "commit-2"], ["commit-3", "commit-4"]),
+ 3: (["commit-1", "commit-2", "commit-3"], ["commit-4"]),
+ 4: (["commit-1", "commit-2", "commit-3", "commit-4"], []),
}
diff_tests = {
- 1: (['LINE2'], ['LINE3', 'LINE4', 'LINE5']),
- 2: (['LINE2', 'LINE3'], ['LINE4', 'LINE5']),
- 3: (['LINE2', 'LINE3', 'LINE4'], ['LINE5']),
- 4: (['LINE2', 'LINE3', 'LINE4', 'LINE5'], []),
+ 1: (["LINE2"], ["LINE3", "LINE4", "LINE5"]),
+ 2: (["LINE2", "LINE3"], ["LINE4", "LINE5"]),
+ 3: (["LINE2", "LINE3", "LINE4"], ["LINE5"]),
+ 4: (["LINE2", "LINE3", "LINE4", "LINE5"], []),
}
for idx, ver in enumerate(versions, 1):
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ params={"version": ver},
+ )
+ )
- response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id,
- params={'version': ver}))
-
- response.mustcontain(no=['rhodecode diff rendering error'])
- response.mustcontain('Showing changes at v{}'.format(idx))
+ response.mustcontain(no=["rhodecode diff rendering error"])
+ response.mustcontain("Showing changes at v{}".format(idx))
yes, no = commit_tests[idx]
for y in yes:
@@ -270,24 +320,27 @@ class TestPullrequestsView(object):
# show diff between versions
diff_compare_tests = {
- 1: (['LINE3'], ['LINE1', 'LINE2']),
- 2: (['LINE3', 'LINE4'], ['LINE1', 'LINE2']),
- 3: (['LINE3', 'LINE4', 'LINE5'], ['LINE1', 'LINE2']),
+ 1: (["LINE3"], ["LINE1", "LINE2"]),
+ 2: (["LINE3", "LINE4"], ["LINE1", "LINE2"]),
+ 3: (["LINE3", "LINE4", "LINE5"], ["LINE1", "LINE2"]),
}
for idx, ver in enumerate(versions, 1):
adds, context = diff_compare_tests[idx]
- to_ver = ver+1
+ to_ver = ver + 1
if idx == 3:
- to_ver = 'latest'
+ to_ver = "latest"
response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id,
- params={'from_version': versions[0], 'version': to_ver}))
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ params={"from_version": versions[0], "version": to_ver},
+ )
+ )
- response.mustcontain(no=['rhodecode diff rendering error'])
+ response.mustcontain(no=["rhodecode diff rendering error"])
for a in adds:
response.mustcontain(cb_line(a))
@@ -296,123 +349,147 @@ class TestPullrequestsView(object):
# test version v2 -> v3
response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id,
- params={'from_version': versions[1], 'version': versions[2]}))
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ params={"from_version": versions[1], "version": versions[2]},
+ )
+ )
- response.mustcontain(cb_context('LINE1'))
- response.mustcontain(cb_context('LINE2'))
- response.mustcontain(cb_context('LINE3'))
- response.mustcontain(cb_line('LINE4'))
+ response.mustcontain(cb_context("LINE1"))
+ response.mustcontain(cb_context("LINE2"))
+ response.mustcontain(cb_context("LINE3"))
+ response.mustcontain(cb_line("LINE4"))
def test_close_status_visibility(self, pr_util, user_util, csrf_token):
# Logout
response = self.app.post(
- h.route_path('logout'),
- params={'csrf_token': csrf_token})
+ h.route_path("logout"), params={"csrf_token": csrf_token}
+ )
# Login as regular user
- response = self.app.post(h.route_path('login'),
- {'username': TEST_USER_REGULAR_LOGIN,
- 'password': 'test12'})
+ response = self.app.post(
+ h.route_path("login"),
+ {"username": TEST_USER_REGULAR_LOGIN, "password": "test12"},
+ )
+
+ pull_request = pr_util.create_pull_request(author=TEST_USER_REGULAR_LOGIN)
- pull_request = pr_util.create_pull_request(
- author=TEST_USER_REGULAR_LOGIN)
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
-
- response.mustcontain('Server-side pull request merging is disabled.')
+ response.mustcontain("Server-side pull request merging is disabled.")
assert_response = response.assert_response()
# for regular user without a merge permissions, we don't see it
- assert_response.no_element_exists('#close-pull-request-action')
+ assert_response.no_element_exists("#close-pull-request-action")
user_util.grant_user_permission_to_repo(
pull_request.target_repo,
UserModel().get_by_username(TEST_USER_REGULAR_LOGIN),
- 'repository.write')
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ "repository.write",
+ )
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
- response.mustcontain('Server-side pull request merging is disabled.')
+ response.mustcontain("Server-side pull request merging is disabled.")
assert_response = response.assert_response()
# now regular user has a merge permissions, we have CLOSE button
- assert_response.one_element_exists('#close-pull-request-action')
+ assert_response.one_element_exists("#close-pull-request-action")
def test_show_invalid_commit_id(self, pr_util):
# Simulating invalid revisions which will cause a lookup error
pull_request = pr_util.create_pull_request()
- pull_request.revisions = ['invalid']
+ pull_request.revisions = ["invalid"]
Session().add(pull_request)
Session().commit()
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
for commit_id in pull_request.revisions:
response.mustcontain(commit_id)
def test_show_invalid_source_reference(self, pr_util):
pull_request = pr_util.create_pull_request()
- pull_request.source_ref = 'branch:b:invalid'
+ pull_request.source_ref = "branch:b:invalid"
Session().add(pull_request)
Session().commit()
- self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
def test_edit_title_description(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
- route_path('pullrequest_update',
- repo_name=pull_request.target_repo.repo_name,
- pull_request_id=pull_request_id),
+ route_path(
+ "pullrequest_update",
+ repo_name=pull_request.target_repo.repo_name,
+ pull_request_id=pull_request_id,
+ ),
params={
- 'edit_pull_request': 'true',
- 'title': 'New title',
- 'description': 'New description',
- 'csrf_token': csrf_token})
+ "edit_pull_request": "true",
+ "title": "New title",
+ "description": "New description",
+ "csrf_token": csrf_token,
+ },
+ )
assert_session_flash(
- response, 'Pull request title & description updated.',
- category='success')
+ response, "Pull request title & description updated.", category="success"
+ )
pull_request = PullRequest.get(pull_request_id)
- assert pull_request.title == 'New title'
- assert pull_request.description == 'New description'
+ assert pull_request.title == "New title"
+ assert pull_request.description == "New description"
def test_edit_title_description_special(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
- route_path('pullrequest_update',
- repo_name=pull_request.target_repo.repo_name,
- pull_request_id=pull_request_id),
+ route_path(
+ "pullrequest_update",
+ repo_name=pull_request.target_repo.repo_name,
+ pull_request_id=pull_request_id,
+ ),
params={
- 'edit_pull_request': 'true',
- 'title': 'New title {} {2} {foo}',
- 'description': 'New description',
- 'csrf_token': csrf_token})
+ "edit_pull_request": "true",
+ "title": "New title {} {2} {foo}",
+ "description": "New description",
+ "csrf_token": csrf_token,
+ },
+ )
assert_session_flash(
- response, 'Pull request title & description updated.',
- category='success')
+ response, "Pull request title & description updated.", category="success"
+ )
pull_request = PullRequest.get(pull_request_id)
- assert pull_request.title_safe == 'New title {{}} {{2}} {{foo}}'
+ assert pull_request.title_safe == "New title {{}} {{2}} {{foo}}"
def test_edit_title_description_closed(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request()
@@ -421,144 +498,183 @@ class TestPullrequestsView(object):
pr_util.close()
response = self.app.post(
- route_path('pullrequest_update',
- repo_name=repo_name, pull_request_id=pull_request_id),
+ route_path(
+ "pullrequest_update",
+ repo_name=repo_name,
+ pull_request_id=pull_request_id,
+ ),
params={
- 'edit_pull_request': 'true',
- 'title': 'New title',
- 'description': 'New description',
- 'csrf_token': csrf_token}, status=200)
+ "edit_pull_request": "true",
+ "title": "New title",
+ "description": "New description",
+ "csrf_token": csrf_token,
+ },
+ status=200,
+ )
assert_session_flash(
- response, 'Cannot update closed pull requests.',
- category='error')
+ response, "Cannot update closed pull requests.", category="error"
+ )
def test_update_invalid_source_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import UpdateFailureReason
pull_request = pr_util.create_pull_request()
- pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
+ pull_request.source_ref = "branch:invalid-branch:invalid-commit-id"
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
- route_path('pullrequest_update',
+ route_path(
+ "pullrequest_update",
repo_name=pull_request.target_repo.repo_name,
- pull_request_id=pull_request_id),
- params={'update_commits': 'true', 'csrf_token': csrf_token})
+ pull_request_id=pull_request_id,
+ ),
+ params={"update_commits": "true", "csrf_token": csrf_token},
+ )
- expected_msg = str(PullRequestModel.UPDATE_STATUS_MESSAGES[
- UpdateFailureReason.MISSING_SOURCE_REF])
- assert_session_flash(response, expected_msg, category='error')
+ expected_msg = str(
+ PullRequestModel.UPDATE_STATUS_MESSAGES[
+ UpdateFailureReason.MISSING_SOURCE_REF
+ ]
+ )
+ assert_session_flash(response, expected_msg, category="error")
def test_missing_target_reference(self, pr_util, csrf_token):
from rhodecode.lib.vcs.backends.base import MergeFailureReason
- pull_request = pr_util.create_pull_request(
- approved=True, mergeable=True)
- unicode_reference = 'branch:invalid-branch:invalid-commit-id'
+
+ pull_request = pr_util.create_pull_request(approved=True, mergeable=True)
+ unicode_reference = "branch:invalid-branch:invalid-commit-id"
pull_request.target_ref = unicode_reference
Session().add(pull_request)
Session().commit()
pull_request_id = pull_request.pull_request_id
pull_request_url = route_path(
- 'pullrequest_show',
+ "pullrequest_show",
repo_name=pull_request.target_repo.repo_name,
- pull_request_id=pull_request_id)
+ pull_request_id=pull_request_id,
+ )
response = self.app.get(pull_request_url)
- target_ref_id = 'invalid-branch'
+ # target_ref_id = "invalid-branch"
+
merge_resp = MergeResponse(
- True, True, '', MergeFailureReason.MISSING_TARGET_REF,
- metadata={'target_ref': PullRequest.unicode_to_reference(unicode_reference)})
+ True,
+ True,
+ Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"),
+ MergeFailureReason.MISSING_TARGET_REF,
+ metadata={
+ "target_ref": PullRequest.unicode_to_reference(unicode_reference)
+ },
+ )
response.assert_response().element_contains(
- 'div[data-role="merge-message"]', merge_resp.merge_status_message)
+ 'div[data-role="merge-message"]', merge_resp.merge_status_message
+ )
def test_comment_and_close_pull_request_custom_message_approved(
- self, pr_util, csrf_token, xhr_header):
-
+ self, pr_util, csrf_token, xhr_header
+ ):
pull_request = pr_util.create_pull_request(approved=True)
pull_request_id = pull_request.pull_request_id
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
- route_path('pullrequest_comment_create',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request_id),
+ route_path(
+ "pullrequest_comment_create",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request_id,
+ ),
params={
- 'close_pull_request': '1',
- 'text': 'Closing a PR',
- 'csrf_token': csrf_token},
- extra_environ=xhr_header,)
+ "close_pull_request": "1",
+ "text": "Closing a PR",
+ "csrf_token": csrf_token,
+ },
+ extra_environ=xhr_header,
+ )
- journal = UserLog.query()\
- .filter(UserLog.user_id == author)\
- .filter(UserLog.repository_id == repo) \
- .order_by(UserLog.user_log_id.asc()) \
+ journal = (
+ UserLog.query()
+ .filter(UserLog.user_id == author)
+ .filter(UserLog.repository_id == repo)
+ .order_by(UserLog.user_log_id.asc())
.all()
- assert journal[-1].action == 'repo.pull_request.close'
+ )
+ assert journal[-1].action == "repo.pull_request.close"
pull_request = PullRequest.get(pull_request_id)
assert pull_request.is_closed()
status = ChangesetStatusModel().get_status(
- pull_request.source_repo, pull_request=pull_request)
+ pull_request.source_repo, pull_request=pull_request
+ )
assert status == ChangesetStatus.STATUS_APPROVED
- comments = ChangesetComment().query() \
- .filter(ChangesetComment.pull_request == pull_request) \
- .order_by(ChangesetComment.comment_id.asc())\
+ comments = (
+ ChangesetComment()
+ .query()
+ .filter(ChangesetComment.pull_request == pull_request)
+ .order_by(ChangesetComment.comment_id.asc())
.all()
- assert comments[-1].text == 'Closing a PR'
+ )
+ assert comments[-1].text == "Closing a PR"
def test_comment_force_close_pull_request_rejected(
- self, pr_util, csrf_token, xhr_header):
+ self, pr_util, csrf_token, xhr_header
+ ):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
PullRequestModel().update_reviewers(
- pull_request_id, [
- (1, ['reason'], False, 'reviewer', []),
- (2, ['reason2'], False, 'reviewer', [])],
- pull_request.author)
+ pull_request_id,
+ [
+ (1, ["reason"], False, "reviewer", []),
+ (2, ["reason2"], False, "reviewer", []),
+ ],
+ pull_request.author,
+ )
author = pull_request.user_id
repo = pull_request.target_repo.repo_id
self.app.post(
- route_path('pullrequest_comment_create',
+ route_path(
+ "pullrequest_comment_create",
repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request_id),
- params={
- 'close_pull_request': '1',
- 'csrf_token': csrf_token},
- extra_environ=xhr_header)
+ pull_request_id=pull_request_id,
+ ),
+ params={"close_pull_request": "1", "csrf_token": csrf_token},
+ extra_environ=xhr_header,
+ )
pull_request = PullRequest.get(pull_request_id)
- journal = UserLog.query()\
- .filter(UserLog.user_id == author, UserLog.repository_id == repo) \
- .order_by(UserLog.user_log_id.asc()) \
+ journal = (
+ UserLog.query()
+ .filter(UserLog.user_id == author, UserLog.repository_id == repo)
+ .order_by(UserLog.user_log_id.asc())
.all()
- assert journal[-1].action == 'repo.pull_request.close'
+ )
+ assert journal[-1].action == "repo.pull_request.close"
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
- pull_request.source_repo, pull_request=pull_request)
+ pull_request.source_repo, pull_request=pull_request
+ )
assert status == ChangesetStatus.STATUS_REJECTED
- def test_comment_and_close_pull_request(
- self, pr_util, csrf_token, xhr_header):
+ def test_comment_and_close_pull_request(self, pr_util, csrf_token, xhr_header):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
response = self.app.post(
- route_path('pullrequest_comment_create',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id),
- params={
- 'close_pull_request': 'true',
- 'csrf_token': csrf_token},
- extra_environ=xhr_header)
+ route_path(
+ "pullrequest_comment_create",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ ),
+ params={"close_pull_request": "true", "csrf_token": csrf_token},
+ extra_environ=xhr_header,
+ )
assert response.json
@@ -567,11 +683,12 @@ class TestPullrequestsView(object):
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
- pull_request.source_repo, pull_request=pull_request)
+ pull_request.source_repo, pull_request=pull_request
+ )
assert status == ChangesetStatus.STATUS_REJECTED
def test_comment_and_close_pull_request_try_edit_comment(
- self, pr_util, csrf_token, xhr_header
+ self, pr_util, csrf_token, xhr_header
):
pull_request = pr_util.create_pull_request()
pull_request_id = pull_request.pull_request_id
@@ -580,15 +697,16 @@ class TestPullrequestsView(object):
response = self.app.post(
route_path(
- 'pullrequest_comment_create',
+ "pullrequest_comment_create",
repo_name=target_scm_name,
pull_request_id=pull_request_id,
),
params={
- 'close_pull_request': 'true',
- 'csrf_token': csrf_token,
+ "close_pull_request": "true",
+ "csrf_token": csrf_token,
},
- extra_environ=xhr_header)
+ extra_environ=xhr_header,
+ )
assert response.json
@@ -599,22 +717,23 @@ class TestPullrequestsView(object):
# check only the latest status, not the review status
status = ChangesetStatusModel().get_status(
- pull_request.source_repo, pull_request=pull_request)
+ pull_request.source_repo, pull_request=pull_request
+ )
assert status == ChangesetStatus.STATUS_REJECTED
for comment_id in response.json.keys():
- test_text = 'test'
+ test_text = "test"
response = self.app.post(
route_path(
- 'pullrequest_comment_edit',
+ "pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
- 'csrf_token': csrf_token,
- 'text': test_text,
+ "csrf_token": csrf_token,
+ "text": test_text,
},
status=403,
)
@@ -627,12 +746,13 @@ class TestPullrequestsView(object):
response = self.app.post(
route_path(
- 'pullrequest_comment_create',
+ "pullrequest_comment_create",
repo_name=target_scm_name,
- pull_request_id=pull_request.pull_request_id),
+ pull_request_id=pull_request.pull_request_id,
+ ),
params={
- 'csrf_token': csrf_token,
- 'text': 'init',
+ "csrf_token": csrf_token,
+ "text": "init",
},
extra_environ=xhr_header,
)
@@ -640,24 +760,27 @@ class TestPullrequestsView(object):
for comment_id in response.json.keys():
assert comment_id
- test_text = 'test'
+ test_text = "test"
self.app.post(
route_path(
- 'pullrequest_comment_edit',
+ "pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
- 'csrf_token': csrf_token,
- 'text': test_text,
- 'version': '0',
+ "csrf_token": csrf_token,
+ "text": test_text,
+ "version": "0",
},
-
)
- text_form_db = ChangesetComment.query().filter(
- ChangesetComment.comment_id == comment_id).first().text
+ text_form_db = (
+ ChangesetComment.query()
+ .filter(ChangesetComment.comment_id == comment_id)
+ .first()
+ .text
+ )
assert test_text == text_form_db
def test_comment_and_comment_edit_special(self, pr_util, csrf_token, xhr_header):
@@ -667,34 +790,34 @@ class TestPullrequestsView(object):
response = self.app.post(
route_path(
- 'pullrequest_comment_create',
+ "pullrequest_comment_create",
repo_name=target_scm_name,
- pull_request_id=pull_request.pull_request_id),
+ pull_request_id=pull_request.pull_request_id,
+ ),
params={
- 'csrf_token': csrf_token,
- 'text': 'init',
+ "csrf_token": csrf_token,
+ "text": "init",
},
extra_environ=xhr_header,
)
assert response.json
for comment_id in response.json.keys():
- test_text = 'init'
+ test_text = "init"
response = self.app.post(
route_path(
- 'pullrequest_comment_edit',
+ "pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
- 'csrf_token': csrf_token,
- 'text': test_text,
- 'version': '0',
+ "csrf_token": csrf_token,
+ "text": test_text,
+ "version": "0",
},
status=404,
-
)
assert response.status_int == 404
@@ -705,79 +828,90 @@ class TestPullrequestsView(object):
response = self.app.post(
route_path(
- 'pullrequest_comment_create',
+ "pullrequest_comment_create",
repo_name=target_scm_name,
- pull_request_id=pull_request.pull_request_id),
+ pull_request_id=pull_request.pull_request_id,
+ ),
params={
- 'csrf_token': csrf_token,
- 'text': 'init',
+ "csrf_token": csrf_token,
+ "text": "init",
},
extra_environ=xhr_header,
)
assert response.json
for comment_id in response.json.keys():
- test_text = 'test'
+ test_text = "test"
self.app.post(
route_path(
- 'pullrequest_comment_edit',
+ "pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
- 'csrf_token': csrf_token,
- 'text': test_text,
- 'version': '0',
+ "csrf_token": csrf_token,
+ "text": test_text,
+ "version": "0",
},
-
)
- test_text_v2 = 'test_v2'
+ test_text_v2 = "test_v2"
response = self.app.post(
route_path(
- 'pullrequest_comment_edit',
+ "pullrequest_comment_edit",
repo_name=target_scm_name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment_id,
),
extra_environ=xhr_header,
params={
- 'csrf_token': csrf_token,
- 'text': test_text_v2,
- 'version': '0',
+ "csrf_token": csrf_token,
+ "text": test_text_v2,
+ "version": "0",
},
status=409,
)
assert response.status_int == 409
- text_form_db = ChangesetComment.query().filter(
- ChangesetComment.comment_id == comment_id).first().text
+ text_form_db = (
+ ChangesetComment.query()
+ .filter(ChangesetComment.comment_id == comment_id)
+ .first()
+ .text
+ )
assert test_text == text_form_db
assert test_text_v2 != text_form_db
def test_comment_and_comment_edit_permissions_forbidden(
- self, autologin_regular_user, user_regular, user_admin, pr_util,
- csrf_token, xhr_header):
+ self,
+ autologin_regular_user,
+ user_regular,
+ user_admin,
+ pr_util,
+ csrf_token,
+ xhr_header,
+ ):
pull_request = pr_util.create_pull_request(
- author=user_admin.username, enable_notifications=False)
+ author=user_admin.username, enable_notifications=False
+ )
comment = CommentsModel().create(
- text='test',
+ text="test",
repo=pull_request.target_repo.scm_instance().name,
user=user_admin,
pull_request=pull_request,
)
response = self.app.post(
route_path(
- 'pullrequest_comment_edit',
+ "pullrequest_comment_edit",
repo_name=pull_request.target_repo.scm_instance().name,
pull_request_id=pull_request.pull_request_id,
comment_id=comment.comment_id,
),
extra_environ=xhr_header,
params={
- 'csrf_token': csrf_token,
- 'text': 'test_text',
+ "csrf_token": csrf_token,
+ "text": "test_text",
},
status=403,
)
@@ -785,231 +919,254 @@ class TestPullrequestsView(object):
def test_create_pull_request(self, backend, csrf_token):
commits = [
- {'message': 'ancestor'},
- {'message': 'change'},
- {'message': 'change2'},
+ {"message": "ancestor"},
+ {"message": "change"},
+ {"message": "change2"},
]
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['ancestor'])
- source = backend.create_repo(heads=['change2'])
+ target = backend.create_repo(heads=["ancestor"])
+ source = backend.create_repo(heads=["change2"])
response = self.app.post(
- route_path('pullrequest_create', repo_name=source.repo_name),
+ route_path("pullrequest_create", repo_name=source.repo_name),
[
- ('source_repo', source.repo_name),
- ('source_ref', 'branch:default:' + commit_ids['change2']),
- ('target_repo', target.repo_name),
- ('target_ref', 'branch:default:' + commit_ids['ancestor']),
- ('common_ancestor', commit_ids['ancestor']),
- ('pullrequest_title', 'Title'),
- ('pullrequest_desc', 'Description'),
- ('description_renderer', 'markdown'),
- ('__start__', 'review_members:sequence'),
- ('__start__', 'reviewer:mapping'),
- ('user_id', '1'),
- ('__start__', 'reasons:sequence'),
- ('reason', 'Some reason'),
- ('__end__', 'reasons:sequence'),
- ('__start__', 'rules:sequence'),
- ('__end__', 'rules:sequence'),
- ('mandatory', 'False'),
- ('__end__', 'reviewer:mapping'),
- ('__end__', 'review_members:sequence'),
- ('__start__', 'revisions:sequence'),
- ('revisions', commit_ids['change']),
- ('revisions', commit_ids['change2']),
- ('__end__', 'revisions:sequence'),
- ('user', ''),
- ('csrf_token', csrf_token),
+ ("source_repo", source.repo_name),
+ ("source_ref", "branch:default:" + commit_ids["change2"]),
+ ("target_repo", target.repo_name),
+ ("target_ref", "branch:default:" + commit_ids["ancestor"]),
+ ("common_ancestor", commit_ids["ancestor"]),
+ ("pullrequest_title", "Title"),
+ ("pullrequest_desc", "Description"),
+ ("description_renderer", "markdown"),
+ ("__start__", "review_members:sequence"),
+ ("__start__", "reviewer:mapping"),
+ ("user_id", "1"),
+ ("__start__", "reasons:sequence"),
+ ("reason", "Some reason"),
+ ("__end__", "reasons:sequence"),
+ ("__start__", "rules:sequence"),
+ ("__end__", "rules:sequence"),
+ ("mandatory", "False"),
+ ("__end__", "reviewer:mapping"),
+ ("__end__", "review_members:sequence"),
+ ("__start__", "revisions:sequence"),
+ ("revisions", commit_ids["change"]),
+ ("revisions", commit_ids["change2"]),
+ ("__end__", "revisions:sequence"),
+ ("user", ""),
+ ("csrf_token", csrf_token),
],
- status=302)
+ status=302,
+ )
- location = response.headers['Location']
- pull_request_id = location.rsplit('/', 1)[1]
- assert pull_request_id != 'new'
+ location = response.headers["Location"]
+ pull_request_id = location.rsplit("/", 1)[1]
+ assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
# check that we have now both revisions
- assert pull_request.revisions == [commit_ids['change2'], commit_ids['change']]
- assert pull_request.source_ref == 'branch:default:' + commit_ids['change2']
- expected_target_ref = 'branch:default:' + commit_ids['ancestor']
+ assert pull_request.revisions == [commit_ids["change2"], commit_ids["change"]]
+ assert pull_request.source_ref == "branch:default:" + commit_ids["change2"]
+ expected_target_ref = "branch:default:" + commit_ids["ancestor"]
assert pull_request.target_ref == expected_target_ref
def test_reviewer_notifications(self, backend, csrf_token):
- # We have to use the app.post for this test so it will create the
+ # We have to use the app.post for this test, so it will create the
# notifications properly with the new PR
commits = [
- {'message': 'ancestor',
- 'added': [FileNode(b'file_A', content=b'content_of_ancestor')]},
- {'message': 'change',
- 'added': [FileNode(b'file_a', content=b'content_of_change')]},
- {'message': 'change-child'},
- {'message': 'ancestor-child', 'parents': ['ancestor'],
- 'added': [ FileNode(b'file_B', content=b'content_of_ancestor_child')]},
- {'message': 'ancestor-child-2'},
+ {
+ "message": "ancestor",
+ "added": [FileNode(b"file_A", content=b"content_of_ancestor")],
+ },
+ {
+ "message": "change",
+ "added": [FileNode(b"file_a", content=b"content_of_change")],
+ },
+ {"message": "change-child"},
+ {
+ "message": "ancestor-child",
+ "parents": ["ancestor"],
+ "branch": "feature",
+ "added": [FileNode(b"file_c", content=b"content_of_ancestor_child")],
+ },
+ {"message": "ancestor-child-2", "branch": "feature"},
]
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['ancestor-child'])
- source = backend.create_repo(heads=['change'])
+ target = backend.create_repo(heads=["ancestor-child"])
+ source = backend.create_repo(heads=["change"])
response = self.app.post(
- route_path('pullrequest_create', repo_name=source.repo_name),
+ route_path("pullrequest_create", repo_name=source.repo_name),
[
- ('source_repo', source.repo_name),
- ('source_ref', 'branch:default:' + commit_ids['change']),
- ('target_repo', target.repo_name),
- ('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
- ('common_ancestor', commit_ids['ancestor']),
- ('pullrequest_title', 'Title'),
- ('pullrequest_desc', 'Description'),
- ('description_renderer', 'markdown'),
- ('__start__', 'review_members:sequence'),
- ('__start__', 'reviewer:mapping'),
- ('user_id', '2'),
- ('__start__', 'reasons:sequence'),
- ('reason', 'Some reason'),
- ('__end__', 'reasons:sequence'),
- ('__start__', 'rules:sequence'),
- ('__end__', 'rules:sequence'),
- ('mandatory', 'False'),
- ('__end__', 'reviewer:mapping'),
- ('__end__', 'review_members:sequence'),
- ('__start__', 'revisions:sequence'),
- ('revisions', commit_ids['change']),
- ('__end__', 'revisions:sequence'),
- ('user', ''),
- ('csrf_token', csrf_token),
+ ("source_repo", source.repo_name),
+ ("source_ref", "branch:default:" + commit_ids["change"]),
+ ("target_repo", target.repo_name),
+ ("target_ref", "branch:default:" + commit_ids["ancestor-child"]),
+ ("common_ancestor", commit_ids["ancestor"]),
+ ("pullrequest_title", "Title"),
+ ("pullrequest_desc", "Description"),
+ ("description_renderer", "markdown"),
+ ("__start__", "review_members:sequence"),
+ ("__start__", "reviewer:mapping"),
+ ("user_id", "2"),
+ ("__start__", "reasons:sequence"),
+ ("reason", "Some reason"),
+ ("__end__", "reasons:sequence"),
+ ("__start__", "rules:sequence"),
+ ("__end__", "rules:sequence"),
+ ("mandatory", "False"),
+ ("__end__", "reviewer:mapping"),
+ ("__end__", "review_members:sequence"),
+ ("__start__", "revisions:sequence"),
+ ("revisions", commit_ids["change"]),
+ ("__end__", "revisions:sequence"),
+ ("user", ""),
+ ("csrf_token", csrf_token),
],
- status=302)
+ status=302,
+ )
- location = response.headers['Location']
+ location = response.headers["Location"]
- pull_request_id = location.rsplit('/', 1)[1]
- assert pull_request_id != 'new'
+ pull_request_id = location.rsplit("/", 1)[1]
+ assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
# Check that a notification was made
- notifications = Notification.query()\
- .filter(Notification.created_by == pull_request.author.user_id,
- Notification.type_ == Notification.TYPE_PULL_REQUEST,
- Notification.subject.contains(
- "requested a pull request review. !%s" % pull_request_id))
+ notifications = Notification.query().filter(
+ Notification.created_by == pull_request.author.user_id,
+ Notification.type_ == Notification.TYPE_PULL_REQUEST,
+ Notification.subject.contains(
+ "requested a pull request review. !%s" % pull_request_id
+ ),
+ )
assert len(notifications.all()) == 1
# Change reviewers and check that a notification was made
PullRequestModel().update_reviewers(
- pull_request.pull_request_id, [
- (1, [], False, 'reviewer', [])
- ],
- pull_request.author)
+ pull_request.pull_request_id,
+ [(1, [], False, "reviewer", [])],
+ pull_request.author,
+ )
assert len(notifications.all()) == 2
def test_create_pull_request_stores_ancestor_commit_id(self, backend, csrf_token):
commits = [
- {'message': 'ancestor',
- 'added': [FileNode(b'file_A', content=b'content_of_ancestor')]},
- {'message': 'change',
- 'added': [FileNode(b'file_a', content=b'content_of_change')]},
- {'message': 'change-child'},
- {'message': 'ancestor-child', 'parents': ['ancestor'],
- 'added': [
- FileNode(b'file_B', content=b'content_of_ancestor_child')]},
- {'message': 'ancestor-child-2'},
+ {
+ "message": "ancestor",
+ "added": [FileNode(b"file_A", content=b"content_of_ancestor")],
+ },
+ {
+ "message": "change",
+ "added": [FileNode(b"file_a", content=b"content_of_change")],
+ },
+ {
+ "message": "change-child",
+ "added": [FileNode(b"file_c", content=b"content_of_change_2")],
+ },
+ {
+ "message": "ancestor-child",
+ "parents": ["ancestor"],
+ "branch": "feature",
+ "added": [FileNode(b"file_B", content=b"content_of_ancestor_child")],
+ },
+ {"message": "ancestor-child-2", "branch": "feature"},
]
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['ancestor-child'])
- source = backend.create_repo(heads=['change'])
+ target = backend.create_repo(heads=["ancestor-child"])
+ source = backend.create_repo(heads=["change"])
response = self.app.post(
- route_path('pullrequest_create', repo_name=source.repo_name),
+ route_path("pullrequest_create", repo_name=source.repo_name),
[
- ('source_repo', source.repo_name),
- ('source_ref', 'branch:default:' + commit_ids['change']),
- ('target_repo', target.repo_name),
- ('target_ref', 'branch:default:' + commit_ids['ancestor-child']),
- ('common_ancestor', commit_ids['ancestor']),
- ('pullrequest_title', 'Title'),
- ('pullrequest_desc', 'Description'),
- ('description_renderer', 'markdown'),
- ('__start__', 'review_members:sequence'),
- ('__start__', 'reviewer:mapping'),
- ('user_id', '1'),
- ('__start__', 'reasons:sequence'),
- ('reason', 'Some reason'),
- ('__end__', 'reasons:sequence'),
- ('__start__', 'rules:sequence'),
- ('__end__', 'rules:sequence'),
- ('mandatory', 'False'),
- ('__end__', 'reviewer:mapping'),
- ('__end__', 'review_members:sequence'),
- ('__start__', 'revisions:sequence'),
- ('revisions', commit_ids['change']),
- ('__end__', 'revisions:sequence'),
- ('user', ''),
- ('csrf_token', csrf_token),
+ ("source_repo", source.repo_name),
+ ("source_ref", "branch:default:" + commit_ids["change"]),
+ ("target_repo", target.repo_name),
+ ("target_ref", "branch:default:" + commit_ids["ancestor-child"]),
+ ("common_ancestor", commit_ids["ancestor"]),
+ ("pullrequest_title", "Title"),
+ ("pullrequest_desc", "Description"),
+ ("description_renderer", "markdown"),
+ ("__start__", "review_members:sequence"),
+ ("__start__", "reviewer:mapping"),
+ ("user_id", "1"),
+ ("__start__", "reasons:sequence"),
+ ("reason", "Some reason"),
+ ("__end__", "reasons:sequence"),
+ ("__start__", "rules:sequence"),
+ ("__end__", "rules:sequence"),
+ ("mandatory", "False"),
+ ("__end__", "reviewer:mapping"),
+ ("__end__", "review_members:sequence"),
+ ("__start__", "revisions:sequence"),
+ ("revisions", commit_ids["change"]),
+ ("__end__", "revisions:sequence"),
+ ("user", ""),
+ ("csrf_token", csrf_token),
],
- status=302)
+ status=302,
+ )
- location = response.headers['Location']
+ location = response.headers["Location"]
- pull_request_id = location.rsplit('/', 1)[1]
- assert pull_request_id != 'new'
+ pull_request_id = location.rsplit("/", 1)[1]
+ assert pull_request_id != "new"
pull_request = PullRequest.get(int(pull_request_id))
# target_ref has to point to the ancestor's commit_id in order to
# show the correct diff
- expected_target_ref = 'branch:default:' + commit_ids['ancestor']
+ expected_target_ref = "branch:default:" + commit_ids["ancestor"]
assert pull_request.target_ref == expected_target_ref
# Check generated diff contents
response = response.follow()
- response.mustcontain(no=['content_of_ancestor'])
- response.mustcontain(no=['content_of_ancestor-child'])
- response.mustcontain('content_of_change')
+ response.mustcontain(no=["content_of_ancestor"])
+ response.mustcontain(no=["content_of_ancestor-child"])
+ response.mustcontain("content_of_change")
def test_merge_pull_request_enabled(self, pr_util, csrf_token):
# Clear any previous calls to rcextensions
rhodecode.EXTENSIONS.calls.clear()
- pull_request = pr_util.create_pull_request(
- approved=True, mergeable=True)
+ pull_request = pr_util.create_pull_request(approved=True, mergeable=True)
pull_request_id = pull_request.pull_request_id
- repo_name = pull_request.target_repo.scm_instance().name,
+ repo_name = (pull_request.target_repo.scm_instance().name,)
- url = route_path('pullrequest_merge',
- repo_name=str(repo_name[0]),
- pull_request_id=pull_request_id)
- response = self.app.post(url, params={'csrf_token': csrf_token}).follow()
+ url = route_path(
+ "pullrequest_merge",
+ repo_name=str(repo_name[0]),
+ pull_request_id=pull_request_id,
+ )
+ response = self.app.post(url, params={"csrf_token": csrf_token}).follow()
pull_request = PullRequest.get(pull_request_id)
assert response.status_int == 200
assert pull_request.is_closed()
- assert_pull_request_status(
- pull_request, ChangesetStatus.STATUS_APPROVED)
+ assert_pull_request_status(pull_request, ChangesetStatus.STATUS_APPROVED)
# Check the relevant log entries were added
user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(3)
actions = [log.action for log in user_logs]
pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
expected_actions = [
- 'repo.pull_request.close',
- 'repo.pull_request.merge',
- 'repo.pull_request.comment.create'
+ "repo.pull_request.close",
+ "repo.pull_request.merge",
+ "repo.pull_request.comment.create",
]
assert actions == expected_actions
user_logs = UserLog.query().order_by(UserLog.user_log_id.desc()).limit(4)
actions = [log for log in user_logs]
- assert actions[-1].action == 'user.push'
- assert actions[-1].action_data['commit_ids'] == pr_commit_ids
+ assert actions[-1].action == "user.push"
+ assert actions[-1].action_data["commit_ids"] == pr_commit_ids
# Check post_push rcextension was really executed
- push_calls = rhodecode.EXTENSIONS.calls['_push_hook']
+ push_calls = rhodecode.EXTENSIONS.calls["_push_hook"]
assert len(push_calls) == 1
unused_last_call_args, last_call_kwargs = push_calls[0]
- assert last_call_kwargs['action'] == 'push'
- assert last_call_kwargs['commit_ids'] == pr_commit_ids
+ assert last_call_kwargs["action"] == "push"
+ assert last_call_kwargs["commit_ids"] == pr_commit_ids
def test_merge_pull_request_disabled(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=False)
@@ -1017,82 +1174,106 @@ class TestPullrequestsView(object):
pull_request = PullRequest.get(pull_request_id)
response = self.app.post(
- route_path('pullrequest_merge',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id),
- params={'csrf_token': csrf_token}).follow()
+ route_path(
+ "pullrequest_merge",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ ),
+ params={"csrf_token": csrf_token},
+ ).follow()
assert response.status_int == 200
response.mustcontain(
- 'Merge is not currently possible because of below failed checks.')
- response.mustcontain('Server-side pull request merging is disabled.')
+ "Merge is not currently possible because of below failed checks."
+ )
+ response.mustcontain("Server-side pull request merging is disabled.")
- @pytest.mark.skip_backends('svn')
+ @pytest.mark.skip_backends("svn")
def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
pull_request = pr_util.create_pull_request(mergeable=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
response = self.app.post(
- route_path('pullrequest_merge',
- repo_name=repo_name, pull_request_id=pull_request_id),
- params={'csrf_token': csrf_token}).follow()
+ route_path(
+ "pullrequest_merge",
+ repo_name=repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"csrf_token": csrf_token},
+ ).follow()
assert response.status_int == 200
response.mustcontain(
- 'Merge is not currently possible because of below failed checks.')
- response.mustcontain('Pull request reviewer approval is pending.')
+ "Merge is not currently possible because of below failed checks."
+ )
+ response.mustcontain("Pull request reviewer approval is pending.")
def test_merge_pull_request_renders_failure_reason(
- self, user_regular, csrf_token, pr_util):
+ self, user_regular, csrf_token, pr_util
+ ):
pull_request = pr_util.create_pull_request(mergeable=True, approved=True)
pull_request_id = pull_request.pull_request_id
repo_name = pull_request.target_repo.scm_instance().name
- merge_resp = MergeResponse(True, False, 'STUB_COMMIT_ID',
- MergeFailureReason.PUSH_FAILED,
- metadata={'target': 'shadow repo',
- 'merge_commit': 'xxx'})
+ merge_resp = MergeResponse(
+ True,
+ False,
+ Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"),
+ MergeFailureReason.PUSH_FAILED,
+ metadata={"target": "shadow repo", "merge_commit": "xxx"},
+ )
model_patcher = mock.patch.multiple(
PullRequestModel,
merge_repo=mock.Mock(return_value=merge_resp),
- merge_status=mock.Mock(return_value=(None, True, 'WRONG_MESSAGE')))
+ merge_status=mock.Mock(return_value=(None, True, "WRONG_MESSAGE")),
+ )
with model_patcher:
response = self.app.post(
- route_path('pullrequest_merge',
- repo_name=repo_name,
- pull_request_id=pull_request_id),
- params={'csrf_token': csrf_token}, status=302)
+ route_path(
+ "pullrequest_merge",
+ repo_name=repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"csrf_token": csrf_token},
+ status=302,
+ )
- merge_resp = MergeResponse(True, True, '', MergeFailureReason.PUSH_FAILED,
- metadata={'target': 'shadow repo',
- 'merge_commit': 'xxx'})
+ merge_resp = MergeResponse(
+ True,
+ True,
+ Reference("commit", "STUB_COMMIT_ID", "STUB_COMMIT_ID"),
+ MergeFailureReason.PUSH_FAILED,
+ metadata={"target": "shadow repo", "merge_commit": "xxx"},
+ )
assert_session_flash(response, merge_resp.merge_status_message)
def test_update_source_revision(self, backend, csrf_token):
commits = [
- {'message': 'ancestor'},
- {'message': 'change'},
- {'message': 'change-2'},
+ {"message": "ancestor"},
+ {"message": "change"},
+ {"message": "change-2"},
]
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['ancestor'])
- source = backend.create_repo(heads=['change'])
+ target = backend.create_repo(heads=["ancestor"])
+ source = backend.create_repo(heads=["change"])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
- pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['change'])
+ pull_request.source_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["change"]
+ )
pull_request.target_repo = target
- pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['ancestor'])
+ pull_request.target_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["ancestor"]
+ )
- pull_request.revisions = [commit_ids['change']]
+ pull_request.revisions = [commit_ids["change"]]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
@@ -1102,51 +1283,67 @@ class TestPullrequestsView(object):
pull_request_id = pull_request.pull_request_id
# source has ancestor - change - change-2
- backend.pull_heads(source, heads=['change-2'])
+ backend.pull_heads(source, heads=["change-2"])
target_repo_name = target.repo_name
# update PR
self.app.post(
- route_path('pullrequest_update',
- repo_name=target_repo_name, pull_request_id=pull_request_id),
- params={'update_commits': 'true', 'csrf_token': csrf_token})
+ route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"update_commits": "true", "csrf_token": csrf_token},
+ )
response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request.pull_request_id))
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
assert response.status_int == 200
- response.mustcontain('Pull request updated to')
- response.mustcontain('with 1 added, 0 removed commits.')
+ response.mustcontain("Pull request updated to")
+ response.mustcontain("with 1 added, 0 removed commits.")
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
- assert pull_request.revisions == [commit_ids['change-2'], commit_ids['change']]
+ assert pull_request.revisions == [commit_ids["change-2"], commit_ids["change"]]
def test_update_target_revision(self, backend, csrf_token):
+ """
+ Checks when we add more commits into a target branch, and update PR
+ """
+
commits = [
- {'message': 'ancestor'},
- {'message': 'change'},
- {'message': 'ancestor-new', 'parents': ['ancestor']},
- {'message': 'change-rebased'},
+ {"message": "commit-a"}, # main branch (our PR target)
+ {"message": "commit-b"}, # Initial source
+ {"message": "commit-c"},
+
+ {"message": "commit-a-prime", "branch": "feature", "parents": ["commit-a"]}, # main branch (source)
]
+
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['ancestor'])
- source = backend.create_repo(heads=['change'])
+ target = backend.create_repo(heads=["commit-a"])
+ source = backend.create_repo(heads=["commit-b"])
+ target_repo_name = target.repo_name
- # create pr from a in source to A in target
+ # create pr from commit-b to commit-a
pull_request = PullRequest()
- pull_request.source_repo = source
- pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['change'])
+ pull_request.target_repo = target
+ pull_request.target_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["commit-a"]
+ )
- pull_request.target_repo = target
- pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['ancestor'])
+ pull_request.source_repo = source
+ pull_request.source_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["commit-b"]
+ )
- pull_request.revisions = [commit_ids['change']]
+ pull_request.revisions = [commit_ids["commit-b"]]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
@@ -1156,63 +1353,76 @@ class TestPullrequestsView(object):
Session().commit()
pull_request_id = pull_request.pull_request_id
- # target has ancestor - ancestor-new
+ # target - add one commit on top commit-a -> commit-b
+ backend.pull_heads(target, heads=["commit-b"])
+
# source has ancestor - ancestor-new - change-rebased
- backend.pull_heads(target, heads=['ancestor-new'])
- backend.pull_heads(source, heads=['change-rebased'])
- target_repo_name = target.repo_name
+ backend.pull_heads(source, heads=["commit-c"])
# update PR
- url = route_path('pullrequest_update',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id)
- self.app.post(url,
- params={'update_commits': 'true', 'csrf_token': csrf_token},
- status=200)
+ url = route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ )
+ self.app.post(
+ url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200
+ )
# check that we have now both revisions
pull_request = PullRequest.get(pull_request_id)
- assert pull_request.revisions == [commit_ids['change-rebased']]
- assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['ancestor-new'])
+ assert pull_request.revisions == [commit_ids["commit-c"]]
+ assert pull_request.target_ref == "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["commit-b"]
+ )
response = self.app.get(
- route_path('pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pull_request.pull_request_id))
+ route_path(
+ "pullrequest_show",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
assert response.status_int == 200
- response.mustcontain('Pull request updated to')
- response.mustcontain('with 1 added, 1 removed commits.')
+ response.mustcontain("Pull request updated to")
+ response.mustcontain("with 1 added, 1 removed commits.")
- def test_update_target_revision_with_removal_of_1_commit_git(self, backend_git, csrf_token):
+ def test_update_target_revision_with_removal_of_1_commit_git(
+ self, backend_git, csrf_token
+ ):
backend = backend_git
commits = [
- {'message': 'master-commit-1'},
- {'message': 'master-commit-2-change-1'},
- {'message': 'master-commit-3-change-2'},
-
- {'message': 'feat-commit-1', 'parents': ['master-commit-1']},
- {'message': 'feat-commit-2'},
+ {"message": "master-commit-1"},
+ {"message": "master-commit-2-change-1"},
+ {"message": "master-commit-3-change-2"},
+ {
+ "message": "feat-commit-1",
+ "parents": ["master-commit-1"],
+ "branch": "feature",
+ },
+ {"message": "feat-commit-2", "branch": "feature"},
]
commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['master-commit-3-change-2'])
- source = backend.create_repo(heads=['feat-commit-2'])
+ target = backend.create_repo(heads=["master-commit-3-change-2"])
+ source = backend.create_repo(heads=["feat-commit-2"])
# create pr from a in source to A in target
pull_request = PullRequest()
pull_request.source_repo = source
- pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
+ pull_request.source_ref = "branch:{branch}:{commit_id}".format(
branch=backend.default_branch_name,
- commit_id=commit_ids['master-commit-3-change-2'])
+ commit_id=commit_ids["master-commit-3-change-2"],
+ )
pull_request.target_repo = target
- pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['feat-commit-2'])
+ pull_request.target_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend.default_branch_name, commit_id=commit_ids["feat-commit-2"]
+ )
pull_request.revisions = [
- commit_ids['feat-commit-1'],
- commit_ids['feat-commit-2']
+ commit_ids["feat-commit-1"],
+ commit_ids["feat-commit-2"],
]
pull_request.title = "Test"
pull_request.description = "Description"
@@ -1225,80 +1435,99 @@ class TestPullrequestsView(object):
# PR is created, now we simulate a force-push into target,
# that drops a 2 last commits
vcsrepo = target.scm_instance()
- vcsrepo.config.clear_section('hooks')
- vcsrepo.run_git_command(['reset', '--soft', 'HEAD~2'])
+ vcsrepo.config.clear_section("hooks")
+ vcsrepo.run_git_command(["reset", "--soft", "HEAD~2"])
target_repo_name = target.repo_name
# update PR
- url = route_path('pullrequest_update',
- repo_name=target_repo_name,
- pull_request_id=pull_request_id)
- self.app.post(url,
- params={'update_commits': 'true', 'csrf_token': csrf_token},
- status=200)
+ url = route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ )
+ self.app.post(
+ url, params={"update_commits": "true", "csrf_token": csrf_token}, status=200
+ )
- response = self.app.get(route_path('pullrequest_new', repo_name=target_repo_name))
+ response = self.app.get(
+ route_path("pullrequest_new", repo_name=target_repo_name)
+ )
assert response.status_int == 200
- response.mustcontain('Pull request updated to')
- response.mustcontain('with 0 added, 0 removed commits.')
+ response.mustcontain("Pull request updated to")
+ response.mustcontain("with 0 added, 0 removed commits.")
- def test_update_of_ancestor_reference(self, backend, csrf_token):
+ def test_update_pr_ancestor_reference(self, csrf_token, pr_util: PRTestUtility):
commits = [
- {'message': 'ancestor'},
- {'message': 'change'},
- {'message': 'change-2'},
- {'message': 'ancestor-new', 'parents': ['ancestor']},
- {'message': 'change-rebased'},
+ {"message": "ancestor"},
+ {"message": "change"},
+ {"message": "change-2"},
+
+ {"message": "ancestor-new", "parents": ["ancestor"], "branch": "feature"},
+ {"message": "change-rebased", "branch": "feature"},
]
- commit_ids = backend.create_master_repo(commits)
- target = backend.create_repo(heads=['ancestor'])
- source = backend.create_repo(heads=['change'])
- # create pr from a in source to A in target
- pull_request = PullRequest()
- pull_request.source_repo = source
+ pull_request = pr_util.create_pull_request(
+ commits,
+ target_head="ancestor",
+ source_head="change",
+ revisions=["change"],
+ )
+ pull_request_id = pull_request.pull_request_id
+ target_repo_name = pr_util.target_repository.repo_name
+ commit_ids = pr_util.commit_ids
+
+ assert pull_request.revisions == [commit_ids["change"]]
+ assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name]
+ assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name]
- pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['change'])
- pull_request.target_repo = target
- pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name, commit_id=commit_ids['ancestor'])
- pull_request.revisions = [commit_ids['change']]
- pull_request.title = "Test"
- pull_request.description = "Description"
- pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
- pull_request.pull_request_state = PullRequest.STATE_CREATED
- Session().add(pull_request)
+ branch = "feature"
+ pr_util.update_target_repository(head="ancestor-new", do_fetch=True)
+ pr_util.set_pr_target_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["ancestor-new"])
+
+ pr_util.update_source_repository(head="change-rebased", do_fetch=True)
+ pr_util.set_pr_source_ref(ref_type="branch", ref_name=branch, ref_commit_id=commit_ids["change-rebased"])
+
+ assert list(pull_request.target_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch]
+ assert list(pull_request.source_repo.scm_instance(cache=False).branches.keys()) == [pr_util.backend.default_branch_name, branch]
+
+ Session().add(pr_util.pull_request)
Session().commit()
- pull_request_id = pull_request.pull_request_id
- # target has ancestor - ancestor-new
- # source has ancestor - ancestor-new - change-rebased
- backend.pull_heads(target, heads=['ancestor-new'])
- backend.pull_heads(source, heads=['change-rebased'])
- target_repo_name = target.repo_name
+ self.app.post(
+ route_path(
+ "pullrequest_update",
+ repo_name=target_repo_name,
+ pull_request_id=pull_request_id,
+ ),
+ params={"update_commits": "true", "csrf_token": csrf_token, "force_refresh": True},
+ status=200,
+ )
+
- # update PR
- self.app.post(
- route_path('pullrequest_update',
- repo_name=target_repo_name, pull_request_id=pull_request_id),
- params={'update_commits': 'true', 'csrf_token': csrf_token},
- status=200)
+ # response = self.app.get(
+ # route_path(
+ # "pullrequest_show", repo_name=target_repo_name, pull_request_id=pull_request_id,
+ # params={"force_refresh": True}
+ # ),
+ # )
+ #
+ # response.mustcontain("Pull request updated to")
+ # response.mustcontain("with 1 added, 0 removed commits.")
- # Expect the target reference to be updated correctly
pull_request = PullRequest.get(pull_request_id)
- assert pull_request.revisions == [commit_ids['change-rebased']]
- expected_target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend.default_branch_name,
- commit_id=commit_ids['ancestor-new'])
- assert pull_request.target_ref == expected_target_ref
+
+ assert pull_request.target_ref == "branch:{branch}:{commit_id}".format(
+ branch="feature", commit_id=commit_ids["ancestor-new"])
+
+ assert pull_request.revisions == [commit_ids["change-rebased"]]
+
def test_remove_pull_request_branch(self, backend_git, csrf_token):
- branch_name = 'development'
+ branch_name = "development"
commits = [
- {'message': 'initial-commit'},
- {'message': 'old-feature'},
- {'message': 'new-feature', 'branch': branch_name},
+ {"message": "initial-commit"},
+ {"message": "old-feature"},
+ {"message": "new-feature", "branch": branch_name},
]
repo = backend_git.create_repo(commits)
repo_name = repo.repo_name
@@ -1307,11 +1536,13 @@ class TestPullrequestsView(object):
pull_request = PullRequest()
pull_request.source_repo = repo
pull_request.target_repo = repo
- pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
- branch=branch_name, commit_id=commit_ids['new-feature'])
- pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
- branch=backend_git.default_branch_name, commit_id=commit_ids['old-feature'])
- pull_request.revisions = [commit_ids['new-feature']]
+ pull_request.source_ref = "branch:{branch}:{commit_id}".format(
+ branch=branch_name, commit_id=commit_ids["new-feature"]
+ )
+ pull_request.target_ref = "branch:{branch}:{commit_id}".format(
+ branch=backend_git.default_branch_name, commit_id=commit_ids["old-feature"]
+ )
+ pull_request.revisions = [commit_ids["new-feature"]]
pull_request.title = "Test"
pull_request.description = "Description"
pull_request.author = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
@@ -1322,310 +1553,362 @@ class TestPullrequestsView(object):
pull_request_id = pull_request.pull_request_id
vcs = repo.scm_instance()
- vcs.remove_ref('refs/heads/{}'.format(branch_name))
+ vcs.remove_ref("refs/heads/{}".format(branch_name))
# NOTE(marcink): run GC to ensure the commits are gone
vcs.run_gc()
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=repo_name,
- pull_request_id=pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show", repo_name=repo_name, pull_request_id=pull_request_id
+ )
+ )
assert response.status_int == 200
response.assert_response().element_contains(
- '#changeset_compare_view_content .alert strong',
- 'Missing commits')
+ "#changeset_compare_view_content .alert strong", "Missing commits"
+ )
response.assert_response().element_contains(
- '#changeset_compare_view_content .alert',
- 'This pull request cannot be displayed, because one or more'
- ' commits no longer exist in the source repository.')
+ "#changeset_compare_view_content .alert",
+ "This pull request cannot be displayed, because one or more"
+ " commits no longer exist in the source repository.",
+ )
- def test_strip_commits_from_pull_request(
- self, backend, pr_util, csrf_token):
+ def test_strip_commits_from_pull_request(self, backend, pr_util):
commits = [
- {'message': 'initial-commit'},
- {'message': 'old-feature'},
- {'message': 'new-feature', 'parents': ['initial-commit']},
+ {"message": "initial-commit"},
+ {"message": "old-feature"},
+ {"message": "new-feature"},
]
pull_request = pr_util.create_pull_request(
- commits, target_head='initial-commit', source_head='new-feature',
- revisions=['new-feature'])
+ commits,
+ target_head="initial-commit",
+ source_head="new-feature",
+ revisions=["new-feature"],
+ )
vcs = pr_util.source_repository.scm_instance()
- if backend.alias == 'git':
- vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
+ if backend.alias == "git":
+ vcs.strip(pr_util.commit_ids["new-feature"], branch_name=pr_util.backend.default_branch_name)
else:
- vcs.strip(pr_util.commit_ids['new-feature'])
+ vcs.strip(pr_util.commit_ids["new-feature"])
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pr_util.target_repository.repo_name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pr_util.target_repository.repo_name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
assert response.status_int == 200
response.assert_response().element_contains(
- '#changeset_compare_view_content .alert strong',
- 'Missing commits')
- response.assert_response().element_contains(
- '#changeset_compare_view_content .alert',
- 'This pull request cannot be displayed, because one or more'
- ' commits no longer exist in the source repository.')
+ "#changeset_compare_view_content .alert strong", "Missing commits"
+ )
response.assert_response().element_contains(
- '#update_commits',
- 'Update commits')
+ "#changeset_compare_view_content .alert",
+ "This pull request cannot be displayed, because one or more"
+ " commits no longer exist in the source repository.",
+ )
+ response.assert_response().element_contains("#update_commits", "Update commits")
- def test_strip_commits_and_update(
- self, backend, pr_util, csrf_token):
+ def test_strip_commits_and_update(self, backend, pr_util, csrf_token):
commits = [
- {'message': 'initial-commit'},
- {'message': 'old-feature'},
- {'message': 'new-feature', 'parents': ['old-feature']},
+ {"message": "initial-commit"},
+ {"message": "old-feature"},
+ {"message": "new-feature", "parents": ["old-feature"]},
]
pull_request = pr_util.create_pull_request(
- commits, target_head='old-feature', source_head='new-feature',
- revisions=['new-feature'], mergeable=True)
+ commits,
+ target_head="old-feature",
+ source_head="new-feature",
+ revisions=["new-feature"],
+ mergeable=True,
+ )
pr_id = pull_request.pull_request_id
target_repo_name = pull_request.target_repo.repo_name
vcs = pr_util.source_repository.scm_instance()
- if backend.alias == 'git':
- vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
+ if backend.alias == "git":
+ vcs.strip(pr_util.commit_ids["new-feature"], branch_name="master")
else:
- vcs.strip(pr_util.commit_ids['new-feature'])
+ vcs.strip(pr_util.commit_ids["new-feature"])
- url = route_path('pullrequest_update',
- repo_name=target_repo_name,
- pull_request_id=pr_id)
- response = self.app.post(url,
- params={'update_commits': 'true',
- 'csrf_token': csrf_token})
+ url = route_path(
+ "pullrequest_update", repo_name=target_repo_name, pull_request_id=pr_id
+ )
+ response = self.app.post(
+ url, params={"update_commits": "true", "csrf_token": csrf_token}
+ )
assert response.status_int == 200
- assert json.loads(response.body) == json.loads('{"response": true, "redirect_url": null}')
+ assert json.loads(response.body) == json.loads(
+ '{"response": true, "redirect_url": null}'
+ )
# Make sure that after update, it won't raise 500 errors
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=target_repo_name,
- pull_request_id=pr_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show", repo_name=target_repo_name, pull_request_id=pr_id
+ )
+ )
assert response.status_int == 200
response.assert_response().element_contains(
- '#changeset_compare_view_content .alert strong',
- 'Missing commits')
+ "#changeset_compare_view_content .alert strong", "Missing commits"
+ )
def test_branch_is_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
- pull_request.source_ref = 'branch:origin:1234567890abcdef'
- pull_request.target_ref = 'branch:target:abcdef1234567890'
+ pull_request.source_ref = "branch:origin:1234567890abcdef"
+ pull_request.target_ref = "branch:target:abcdef1234567890"
Session().add(pull_request)
Session().commit()
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
assert response.status_int == 200
- source = response.assert_response().get_element('.pr-source-info')
+ source = response.assert_response().get_element(".pr-source-info")
source_parent = source.getparent()
assert len(source_parent) == 1
- target = response.assert_response().get_element('.pr-target-info')
+ target = response.assert_response().get_element(".pr-target-info")
target_parent = target.getparent()
assert len(target_parent) == 1
expected_origin_link = route_path(
- 'repo_commits',
+ "repo_commits",
repo_name=pull_request.source_repo.scm_instance().name,
- params=dict(branch='origin'))
+ params=dict(branch="origin"),
+ )
expected_target_link = route_path(
- 'repo_commits',
+ "repo_commits",
repo_name=pull_request.target_repo.scm_instance().name,
- params=dict(branch='target'))
- assert source_parent.attrib['href'] == expected_origin_link
- assert target_parent.attrib['href'] == expected_target_link
+ params=dict(branch="target"),
+ )
+ assert source_parent.attrib["href"] == expected_origin_link
+ assert target_parent.attrib["href"] == expected_target_link
def test_bookmark_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
- pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
- pull_request.target_ref = 'bookmark:target:abcdef1234567890'
+ pull_request.source_ref = "bookmark:origin:1234567890abcdef"
+ pull_request.target_ref = "bookmark:target:abcdef1234567890"
Session().add(pull_request)
Session().commit()
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
assert response.status_int == 200
- source = response.assert_response().get_element('.pr-source-info')
- assert source.text.strip() == 'bookmark:origin'
- assert source.getparent().attrib.get('href') is None
+ source = response.assert_response().get_element(".pr-source-info")
+ assert source.text.strip() == "bookmark:origin"
+ assert source.getparent().attrib.get("href") is None
- target = response.assert_response().get_element('.pr-target-info')
- assert target.text.strip() == 'bookmark:target'
- assert target.getparent().attrib.get('href') is None
+ target = response.assert_response().get_element(".pr-target-info")
+ assert target.text.strip() == "bookmark:target"
+ assert target.getparent().attrib.get("href") is None
def test_tag_is_not_a_link(self, pr_util):
pull_request = pr_util.create_pull_request()
- pull_request.source_ref = 'tag:origin:1234567890abcdef'
- pull_request.target_ref = 'tag:target:abcdef1234567890'
+ pull_request.source_ref = "tag:origin:1234567890abcdef"
+ pull_request.target_ref = "tag:target:abcdef1234567890"
Session().add(pull_request)
Session().commit()
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
assert response.status_int == 200
- source = response.assert_response().get_element('.pr-source-info')
- assert source.text.strip() == 'tag:origin'
- assert source.getparent().attrib.get('href') is None
+ source = response.assert_response().get_element(".pr-source-info")
+ assert source.text.strip() == "tag:origin"
+ assert source.getparent().attrib.get("href") is None
- target = response.assert_response().get_element('.pr-target-info')
- assert target.text.strip() == 'tag:target'
- assert target.getparent().attrib.get('href') is None
+ target = response.assert_response().get_element(".pr-target-info")
+ assert target.text.strip() == "tag:target"
+ assert target.getparent().attrib.get("href") is None
- @pytest.mark.parametrize('mergeable', [True, False])
- def test_shadow_repository_link(
- self, mergeable, pr_util, http_host_only_stub):
+ @pytest.mark.parametrize("mergeable", [True, False])
+ def test_shadow_repository_link(self, mergeable, pr_util, http_host_only_stub):
"""
Check that the pull request summary page displays a link to the shadow
repository if the pull request is mergeable. If it is not mergeable
the link should not be displayed.
"""
pull_request = pr_util.create_pull_request(
- mergeable=mergeable, enable_notifications=False)
+ mergeable=mergeable, enable_notifications=False
+ )
target_repo = pull_request.target_repo.scm_instance()
pr_id = pull_request.pull_request_id
- shadow_url = '{host}/{repo}/pull-request/{pr_id}/repository'.format(
- host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id)
+ shadow_url = "{host}/{repo}/pull-request/{pr_id}/repository".format(
+ host=http_host_only_stub, repo=target_repo.name, pr_id=pr_id
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=target_repo.name,
- pull_request_id=pr_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show", repo_name=target_repo.name, pull_request_id=pr_id
+ )
+ )
if mergeable:
response.assert_response().element_value_contains(
- 'input.pr-mergeinfo', shadow_url)
+ "input.pr-mergeinfo", shadow_url
+ )
response.assert_response().element_value_contains(
- 'input.pr-mergeinfo ', 'pr-merge')
+ "input.pr-mergeinfo ", "pr-merge"
+ )
else:
- response.assert_response().no_element_exists('.pr-mergeinfo')
+ response.assert_response().no_element_exists(".pr-mergeinfo")
-@pytest.mark.usefixtures('app')
+@pytest.mark.usefixtures("app")
@pytest.mark.backends("git", "hg")
class TestPullrequestsControllerDelete(object):
def test_pull_request_delete_button_permissions_admin(
- self, autologin_user, user_admin, pr_util):
+ self, autologin_user, user_admin, pr_util
+ ):
pull_request = pr_util.create_pull_request(
- author=user_admin.username, enable_notifications=False)
+ author=user_admin.username, enable_notifications=False
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
response.mustcontain('id="delete_pullrequest"')
- response.mustcontain('Confirm to delete this pull request')
+ response.mustcontain("Confirm to delete this pull request")
def test_pull_request_delete_button_permissions_owner(
- self, autologin_regular_user, user_regular, pr_util):
+ self, autologin_regular_user, user_regular, pr_util
+ ):
pull_request = pr_util.create_pull_request(
- author=user_regular.username, enable_notifications=False)
+ author=user_regular.username, enable_notifications=False
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
response.mustcontain('id="delete_pullrequest"')
- response.mustcontain('Confirm to delete this pull request')
+ response.mustcontain("Confirm to delete this pull request")
def test_pull_request_delete_button_permissions_forbidden(
- self, autologin_regular_user, user_regular, user_admin, pr_util):
+ self, autologin_regular_user, user_regular, user_admin, pr_util
+ ):
pull_request = pr_util.create_pull_request(
- author=user_admin.username, enable_notifications=False)
+ author=user_admin.username, enable_notifications=False
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
response.mustcontain(no=['id="delete_pullrequest"'])
- response.mustcontain(no=['Confirm to delete this pull request'])
+ response.mustcontain(no=["Confirm to delete this pull request"])
def test_pull_request_delete_button_permissions_can_update_cannot_delete(
- self, autologin_regular_user, user_regular, user_admin, pr_util,
- user_util):
-
+ self, autologin_regular_user, user_regular, user_admin, pr_util, user_util
+ ):
pull_request = pr_util.create_pull_request(
- author=user_admin.username, enable_notifications=False)
+ author=user_admin.username, enable_notifications=False
+ )
user_util.grant_user_permission_to_repo(
- pull_request.target_repo, user_regular,
- 'repository.write')
+ pull_request.target_repo, user_regular, "repository.write"
+ )
- response = self.app.get(route_path(
- 'pullrequest_show',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id))
+ response = self.app.get(
+ route_path(
+ "pullrequest_show",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ )
+ )
response.mustcontain('id="open_edit_pullrequest"')
response.mustcontain('id="delete_pullrequest"')
- response.mustcontain(no=['Confirm to delete this pull request'])
+ response.mustcontain(no=["Confirm to delete this pull request"])
def test_delete_comment_returns_404_if_comment_does_not_exist(
- self, autologin_user, pr_util, user_admin, csrf_token, xhr_header):
-
+ self, autologin_user, pr_util, user_admin, csrf_token, xhr_header
+ ):
pull_request = pr_util.create_pull_request(
- author=user_admin.username, enable_notifications=False)
+ author=user_admin.username, enable_notifications=False
+ )
self.app.post(
route_path(
- 'pullrequest_comment_delete',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id,
- comment_id=1024404),
+ "pullrequest_comment_delete",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ comment_id=1024404,
+ ),
extra_environ=xhr_header,
- params={'csrf_token': csrf_token},
- status=404
+ params={"csrf_token": csrf_token},
+ status=404,
)
def test_delete_comment(
- self, autologin_user, pr_util, user_admin, csrf_token, xhr_header):
-
+ self, autologin_user, pr_util, user_admin, csrf_token, xhr_header
+ ):
pull_request = pr_util.create_pull_request(
- author=user_admin.username, enable_notifications=False)
+ author=user_admin.username, enable_notifications=False
+ )
comment = pr_util.create_comment()
comment_id = comment.comment_id
response = self.app.post(
route_path(
- 'pullrequest_comment_delete',
- repo_name=pull_request.target_repo.scm_instance().name,
- pull_request_id=pull_request.pull_request_id,
- comment_id=comment_id),
+ "pullrequest_comment_delete",
+ repo_name=pull_request.target_repo.scm_instance().name,
+ pull_request_id=pull_request.pull_request_id,
+ comment_id=comment_id,
+ ),
extra_environ=xhr_header,
- params={'csrf_token': csrf_token},
- status=200
+ params={"csrf_token": csrf_token},
+ status=200,
)
- assert response.text == 'true'
+ assert response.text == "true"
- @pytest.mark.parametrize('url_type', [
- 'pullrequest_new',
- 'pullrequest_create',
- 'pullrequest_update',
- 'pullrequest_merge',
- ])
+ @pytest.mark.parametrize(
+ "url_type",
+ [
+ "pullrequest_new",
+ "pullrequest_create",
+ "pullrequest_update",
+ "pullrequest_merge",
+ ],
+ )
def test_pull_request_is_forbidden_on_archived_repo(
- self, autologin_user, backend, xhr_header, user_util, url_type):
-
+ self, autologin_user, backend, xhr_header, user_util, url_type
+ ):
# create a temporary repo
source = user_util.create_repo(repo_type=backend.alias)
repo_name = source.repo_name
@@ -1634,9 +1917,10 @@ class TestPullrequestsControllerDelete(o
Session().commit()
response = self.app.get(
- route_path(url_type, repo_name=repo_name, pull_request_id=1), status=302)
+ route_path(url_type, repo_name=repo_name, pull_request_id=1), status=302
+ )
- msg = 'Action not supported for archived repository.'
+ msg = "Action not supported for archived repository."
assert_session_flash(response, msg)
@@ -1645,7 +1929,7 @@ def assert_pull_request_status(pull_requ
assert status == expected_status
-@pytest.mark.parametrize('route', ['pullrequest_new', 'pullrequest_create'])
+@pytest.mark.parametrize("route", ["pullrequest_new", "pullrequest_create"])
@pytest.mark.usefixtures("autologin_user")
def test_forbidde_to_repo_summary_for_svn_repositories(backend_svn, app, route):
app.get(route_path(route, repo_name=backend_svn.repo_name), status=404)
diff --git a/rhodecode/model/scm.py b/rhodecode/model/scm.py
--- a/rhodecode/model/scm.py
+++ b/rhodecode/model/scm.py
@@ -442,7 +442,7 @@ class ScmModel(BaseModel):
raise
def commit_change(self, repo, repo_name, commit, user, author, message,
- content: bytes, f_path: bytes):
+ content: bytes, f_path: bytes, branch: str = None):
"""
Commits changes
"""
@@ -458,7 +458,7 @@ class ScmModel(BaseModel):
# TODO: handle pre-push action !
tip = imc.commit(
message=message, author=author, parents=[commit],
- branch=commit.branch)
+ branch=branch or commit.branch)
except Exception as e:
log.error(traceback.format_exc())
raise IMCCommitError(str(e))
diff --git a/rhodecode/tests/fixture_mods/fixture_utils.py b/rhodecode/tests/fixture_mods/fixture_utils.py
--- a/rhodecode/tests/fixture_mods/fixture_utils.py
+++ b/rhodecode/tests/fixture_mods/fixture_utils.py
@@ -564,7 +564,7 @@ class Backend(object):
self._cleanup_repos.append(repo.repo_name)
commits = commits or [
- {'message': 'Commit %s of %s' % (x, self.repo_name)}
+ {'message': f'Commit {x} of {self.repo_name}'}
for x in range(number_of_commits)]
vcs_repo = repo.scm_instance()
vcs_repo.count()
@@ -574,13 +574,15 @@ class Backend(object):
return repo
- def pull_heads(self, repo, heads):
+ def pull_heads(self, repo, heads, do_fetch=False):
"""
Make sure that repo contains all commits mentioned in `heads`
"""
vcsrepo = repo.scm_instance()
vcsrepo.config.clear_section('hooks')
commit_ids = [self._commit_ids[h] for h in heads]
+ if do_fetch:
+ vcsrepo.fetch(self._master_repo_path, commit_ids=commit_ids)
vcsrepo.pull(self._master_repo_path, commit_ids=commit_ids)
def create_fork(self):
@@ -596,10 +598,10 @@ class Backend(object):
return self.repo_name
def _next_repo_name(self):
- return u"%s_%s" % (
+ return "%s_%s" % (
self.invalid_repo_name.sub('_', self._test_name), len(self._cleanup_repos))
- def ensure_file(self, filename, content='Test content\n'):
+ def ensure_file(self, filename, content=b'Test content\n'):
assert self._cleanup_repos, "Avoid writing into vcs_test repos"
commits = [
{'added': [
@@ -628,15 +630,14 @@ class Backend(object):
if self.alias == 'git':
refs = {}
for message in self._commit_ids:
- # TODO: mikhail: do more special chars replacements
- ref_name = 'refs/test-refs/{}'.format(
- message.replace(' ', ''))
+ cleanup_message = message.replace(' ', '')
+ ref_name = f'refs/test-refs/{cleanup_message}'
refs[ref_name] = self._commit_ids[message]
self._create_refs(repo, refs)
def _create_refs(self, repo, refs):
- for ref_name in refs:
- repo.set_refs(ref_name, refs[ref_name])
+ for ref_name, ref_val in refs.items():
+ repo.set_refs(ref_name, ref_val)
class VcsBackend(object):
@@ -786,10 +787,9 @@ def _add_commits_to_repo(vcs_repo, commi
return commit_ids
imc = vcs_repo.in_memory_commit
- commit = None
for idx, commit in enumerate(commits):
- message = str(commit.get('message', 'Commit %s' % idx))
+ message = str(commit.get('message', f'Commit {idx}'))
for node in commit.get('added', []):
imc.add(FileNode(safe_bytes(node.path), content=node.content))
@@ -880,6 +880,7 @@ class PRTestUtility(object):
mergeable_patcher = None
mergeable_mock = None
notification_patcher = None
+ commit_ids: dict
def __init__(self, backend):
self.backend = backend
@@ -888,7 +889,7 @@ class PRTestUtility(object):
self, commits=None, target_head=None, source_head=None,
revisions=None, approved=False, author=None, mergeable=False,
enable_notifications=True, name_suffix='', reviewers=None, observers=None,
- title=u"Test", description=u"Description"):
+ title="Test", description="Description"):
self.set_mergeable(mergeable)
if not enable_notifications:
# mock notification side effect
@@ -949,11 +950,11 @@ class PRTestUtility(object):
def close(self):
PullRequestModel().close_pull_request(self.pull_request, self.author)
- def _default_branch_reference(self, commit_message):
- reference = '%s:%s:%s' % (
- 'branch',
- self.backend.default_branch_name,
- self.commit_ids[commit_message])
+ def _default_branch_reference(self, commit_message, branch: str = None) -> str:
+ default_branch = branch or self.backend.default_branch_name
+ message = self.commit_ids[commit_message]
+ reference = f'branch:{default_branch}:{message}'
+
return reference
def _get_reviewers(self):
@@ -968,9 +969,23 @@ class PRTestUtility(object):
]
- def update_source_repository(self, head=None):
+ def update_source_repository(self, head=None, do_fetch=False):
+ heads = [head or 'c3']
+ self.backend.pull_heads(self.source_repository, heads=heads, do_fetch=do_fetch)
+
+ def update_target_repository(self, head=None, do_fetch=False):
heads = [head or 'c3']
- self.backend.pull_heads(self.source_repository, heads=heads)
+ self.backend.pull_heads(self.target_repository, heads=heads, do_fetch=do_fetch)
+
+ def set_pr_target_ref(self, ref_type: str = "branch", ref_name: str = "branch", ref_commit_id: str = "") -> str:
+ full_ref = f"{ref_type}:{ref_name}:{ref_commit_id}"
+ self.pull_request.target_ref = full_ref
+ return full_ref
+
+ def set_pr_source_ref(self, ref_type: str = "branch", ref_name: str = "branch", ref_commit_id: str = "") -> str:
+ full_ref = f"{ref_type}:{ref_name}:{ref_commit_id}"
+ self.pull_request.source_ref = full_ref
+ return full_ref
def add_one_commit(self, head=None):
self.update_source_repository(head=head)
@@ -1000,7 +1015,7 @@ class PRTestUtility(object):
def create_comment(self, linked_to=None):
comment = CommentsModel().create(
- text=u"Test comment",
+ text="Test comment",
repo=self.target_repository.repo_name,
user=self.author,
pull_request=self.pull_request)
@@ -1014,7 +1029,7 @@ class PRTestUtility(object):
def create_inline_comment(
self, linked_to=None, line_no='n1', file_path='file_1'):
comment = CommentsModel().create(
- text=u"Test comment",
+ text="Test comment",
repo=self.target_repository.repo_name,
user=self.author,
line_no=line_no,
diff --git a/rhodecode/tests/models/test_scm.py b/rhodecode/tests/models/test_scm.py
--- a/rhodecode/tests/models/test_scm.py
+++ b/rhodecode/tests/models/test_scm.py
@@ -89,8 +89,8 @@ def test_strip_with_multiple_heads(backe
{'message': 'A'},
{'message': 'a'},
{'message': 'b'},
- {'message': 'B', 'parents': ['A']},
- {'message': 'a1'},
+ {'message': 'B', 'parents': ['A'], 'branch': 'feature'},
+ {'message': 'a1', 'branch': 'feature'},
]
repo = backend_hg.create_repo(commits=commits)
commit_ids = backend_hg.commit_ids
diff --git a/rhodecode/tests/utils.py b/rhodecode/tests/utils.py
--- a/rhodecode/tests/utils.py
+++ b/rhodecode/tests/utils.py
@@ -431,7 +431,7 @@ def repo_on_filesystem(repo_name):
def commit_change(
- repo, filename: bytes, content: bytes, message, vcs_type, parent=None, newfile=False):
+ repo, filename: bytes, content: bytes, message, vcs_type, parent=None, branch=None, newfile=False):
from rhodecode.tests import TEST_USER_ADMIN_LOGIN
repo = Repository.get_by_repo_name(repo)
@@ -459,7 +459,8 @@ def commit_change(
author=f'{TEST_USER_ADMIN_LOGIN} ',
message=message,
content=content,
- f_path=filename
+ f_path=filename,
+ branch=branch
)
return commit