|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright (C) 2010-2017 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import urllib
|
|
|
|
|
|
import mock
|
|
|
import pytest
|
|
|
|
|
|
from rhodecode.lib import auth
|
|
|
from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode
|
|
|
from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
|
|
|
from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
|
|
|
Permission
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
from rhodecode.model.repo_group import RepoGroupModel
|
|
|
from rhodecode.model.settings import SettingsModel, VcsSettingsModel
|
|
|
from rhodecode.model.user import UserModel
|
|
|
from rhodecode.tests import (
|
|
|
login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
|
|
|
TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
|
|
|
logout_user_session)
|
|
|
from rhodecode.tests.fixture import Fixture, error_function
|
|
|
from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
|
|
|
|
|
|
fixture = Fixture()
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("app")
|
|
|
class TestAdminRepos(object):
|
|
|
|
|
|
def test_index(self):
|
|
|
self.app.get(url('repos'))
|
|
|
|
|
|
def test_create_page_restricted(self, autologin_user, backend):
|
|
|
with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
|
|
|
response = self.app.get(url('new_repo'), status=200)
|
|
|
assert_response = AssertResponse(response)
|
|
|
element = assert_response.get_element('#repo_type')
|
|
|
assert element.text_content() == '\ngit\n'
|
|
|
|
|
|
def test_create_page_non_restricted(self, autologin_user, backend):
|
|
|
response = self.app.get(url('new_repo'), status=200)
|
|
|
assert_response = AssertResponse(response)
|
|
|
assert_response.element_contains('#repo_type', 'git')
|
|
|
assert_response.element_contains('#repo_type', 'svn')
|
|
|
assert_response.element_contains('#repo_type', 'hg')
|
|
|
|
|
|
@pytest.mark.parametrize("suffix",
|
|
|
[u'', u'xxa'], ids=['', 'non-ascii'])
|
|
|
def test_create(self, autologin_user, backend, suffix, csrf_token):
|
|
|
repo_name_unicode = backend.new_repo_name(suffix=suffix)
|
|
|
repo_name = repo_name_unicode.encode('utf8')
|
|
|
description_unicode = u'description for newly created repo' + suffix
|
|
|
description = description_unicode.encode('utf8')
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
csrf_token=csrf_token),
|
|
|
status=302)
|
|
|
|
|
|
self.assert_repository_is_created_correctly(
|
|
|
repo_name, description, backend)
|
|
|
|
|
|
def test_create_numeric(self, autologin_user, backend, csrf_token):
|
|
|
numeric_repo = '1234'
|
|
|
repo_name = numeric_repo
|
|
|
description = 'description for newly created repo' + numeric_repo
|
|
|
self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
self.assert_repository_is_created_correctly(
|
|
|
repo_name, description, backend)
|
|
|
|
|
|
@pytest.mark.parametrize("suffix", [u'', u'ąćę'], ids=['', 'non-ascii'])
|
|
|
def test_create_in_group(
|
|
|
self, autologin_user, backend, suffix, csrf_token):
|
|
|
# create GROUP
|
|
|
group_name = 'sometest_%s' % backend.alias
|
|
|
gr = RepoGroupModel().create(group_name=group_name,
|
|
|
group_description='test',
|
|
|
owner=TEST_USER_ADMIN_LOGIN)
|
|
|
Session().commit()
|
|
|
|
|
|
repo_name = u'ingroup' + suffix
|
|
|
repo_name_full = RepoGroup.url_sep().join(
|
|
|
[group_name, repo_name])
|
|
|
description = u'description for newly created repo'
|
|
|
self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=safe_str(repo_name),
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
repo_group=gr.group_id,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
# TODO: johbo: Cleanup work to fixture
|
|
|
try:
|
|
|
self.assert_repository_is_created_correctly(
|
|
|
repo_name_full, description, backend)
|
|
|
|
|
|
new_repo = RepoModel().get_by_repo_name(repo_name_full)
|
|
|
inherited_perms = UserRepoToPerm.query().filter(
|
|
|
UserRepoToPerm.repository_id == new_repo.repo_id).all()
|
|
|
assert len(inherited_perms) == 1
|
|
|
finally:
|
|
|
RepoModel().delete(repo_name_full)
|
|
|
RepoGroupModel().delete(group_name)
|
|
|
Session().commit()
|
|
|
|
|
|
def test_create_in_group_numeric(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
# create GROUP
|
|
|
group_name = 'sometest_%s' % backend.alias
|
|
|
gr = RepoGroupModel().create(group_name=group_name,
|
|
|
group_description='test',
|
|
|
owner=TEST_USER_ADMIN_LOGIN)
|
|
|
Session().commit()
|
|
|
|
|
|
repo_name = '12345'
|
|
|
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
|
|
|
description = 'description for newly created repo'
|
|
|
self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
repo_group=gr.group_id,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
# TODO: johbo: Cleanup work to fixture
|
|
|
try:
|
|
|
self.assert_repository_is_created_correctly(
|
|
|
repo_name_full, description, backend)
|
|
|
|
|
|
new_repo = RepoModel().get_by_repo_name(repo_name_full)
|
|
|
inherited_perms = UserRepoToPerm.query()\
|
|
|
.filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
|
|
|
assert len(inherited_perms) == 1
|
|
|
finally:
|
|
|
RepoModel().delete(repo_name_full)
|
|
|
RepoGroupModel().delete(group_name)
|
|
|
Session().commit()
|
|
|
|
|
|
def test_create_in_group_without_needed_permissions(self, backend):
|
|
|
session = login_user_session(
|
|
|
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
|
|
|
csrf_token = auth.get_csrf_token(session)
|
|
|
# revoke
|
|
|
user_model = UserModel()
|
|
|
# disable fork and create on default user
|
|
|
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
|
|
|
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
|
|
|
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
|
|
|
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
|
|
|
|
|
|
# disable on regular user
|
|
|
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
|
|
|
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
|
|
|
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
|
|
|
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
|
|
|
Session().commit()
|
|
|
|
|
|
# create GROUP
|
|
|
group_name = 'reg_sometest_%s' % backend.alias
|
|
|
gr = RepoGroupModel().create(group_name=group_name,
|
|
|
group_description='test',
|
|
|
owner=TEST_USER_ADMIN_LOGIN)
|
|
|
Session().commit()
|
|
|
|
|
|
group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
|
|
|
gr_allowed = RepoGroupModel().create(
|
|
|
group_name=group_name_allowed,
|
|
|
group_description='test',
|
|
|
owner=TEST_USER_REGULAR_LOGIN)
|
|
|
Session().commit()
|
|
|
|
|
|
repo_name = 'ingroup'
|
|
|
description = 'description for newly created repo'
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
repo_group=gr.group_id,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
response.mustcontain('Invalid value')
|
|
|
|
|
|
# user is allowed to create in this group
|
|
|
repo_name = 'ingroup'
|
|
|
repo_name_full = RepoGroup.url_sep().join(
|
|
|
[group_name_allowed, repo_name])
|
|
|
description = 'description for newly created repo'
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
repo_group=gr_allowed.group_id,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
# TODO: johbo: Cleanup in pytest fixture
|
|
|
try:
|
|
|
self.assert_repository_is_created_correctly(
|
|
|
repo_name_full, description, backend)
|
|
|
|
|
|
new_repo = RepoModel().get_by_repo_name(repo_name_full)
|
|
|
inherited_perms = UserRepoToPerm.query().filter(
|
|
|
UserRepoToPerm.repository_id == new_repo.repo_id).all()
|
|
|
assert len(inherited_perms) == 1
|
|
|
|
|
|
assert repo_on_filesystem(repo_name_full)
|
|
|
finally:
|
|
|
RepoModel().delete(repo_name_full)
|
|
|
RepoGroupModel().delete(group_name)
|
|
|
RepoGroupModel().delete(group_name_allowed)
|
|
|
Session().commit()
|
|
|
|
|
|
def test_create_in_group_inherit_permissions(self, autologin_user, backend,
|
|
|
csrf_token):
|
|
|
# create GROUP
|
|
|
group_name = 'sometest_%s' % backend.alias
|
|
|
gr = RepoGroupModel().create(group_name=group_name,
|
|
|
group_description='test',
|
|
|
owner=TEST_USER_ADMIN_LOGIN)
|
|
|
perm = Permission.get_by_key('repository.write')
|
|
|
RepoGroupModel().grant_user_permission(
|
|
|
gr, TEST_USER_REGULAR_LOGIN, perm)
|
|
|
|
|
|
# add repo permissions
|
|
|
Session().commit()
|
|
|
|
|
|
repo_name = 'ingroup_inherited_%s' % backend.alias
|
|
|
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
|
|
|
description = 'description for newly created repo'
|
|
|
self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
repo_group=gr.group_id,
|
|
|
repo_copy_permissions=True,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
# TODO: johbo: Cleanup to pytest fixture
|
|
|
try:
|
|
|
self.assert_repository_is_created_correctly(
|
|
|
repo_name_full, description, backend)
|
|
|
except Exception:
|
|
|
RepoGroupModel().delete(group_name)
|
|
|
Session().commit()
|
|
|
raise
|
|
|
|
|
|
# check if inherited permissions are applied
|
|
|
new_repo = RepoModel().get_by_repo_name(repo_name_full)
|
|
|
inherited_perms = UserRepoToPerm.query().filter(
|
|
|
UserRepoToPerm.repository_id == new_repo.repo_id).all()
|
|
|
assert len(inherited_perms) == 2
|
|
|
|
|
|
assert TEST_USER_REGULAR_LOGIN in [
|
|
|
x.user.username for x in inherited_perms]
|
|
|
assert 'repository.write' in [
|
|
|
x.permission.permission_name for x in inherited_perms]
|
|
|
|
|
|
RepoModel().delete(repo_name_full)
|
|
|
RepoGroupModel().delete(group_name)
|
|
|
Session().commit()
|
|
|
|
|
|
@pytest.mark.xfail_backends(
|
|
|
"git", "hg", reason="Missing reposerver support")
|
|
|
def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
|
|
|
csrf_token):
|
|
|
source_repo = backend.create_repo(number_of_commits=2)
|
|
|
source_repo_name = source_repo.repo_name
|
|
|
reposerver.serve(source_repo.scm_instance())
|
|
|
|
|
|
repo_name = backend.new_repo_name()
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description='',
|
|
|
clone_uri=reposerver.url,
|
|
|
csrf_token=csrf_token),
|
|
|
status=302)
|
|
|
|
|
|
# Should be redirected to the creating page
|
|
|
response.mustcontain('repo_creating')
|
|
|
|
|
|
# Expecting that both repositories have same history
|
|
|
source_repo = RepoModel().get_by_repo_name(source_repo_name)
|
|
|
source_vcs = source_repo.scm_instance()
|
|
|
repo = RepoModel().get_by_repo_name(repo_name)
|
|
|
repo_vcs = repo.scm_instance()
|
|
|
assert source_vcs[0].message == repo_vcs[0].message
|
|
|
assert source_vcs.count() == repo_vcs.count()
|
|
|
assert source_vcs.commit_ids == repo_vcs.commit_ids
|
|
|
|
|
|
@pytest.mark.xfail_backends("svn", reason="Depends on import support")
|
|
|
def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
|
|
|
csrf_token):
|
|
|
repo_name = backend.new_repo_name()
|
|
|
description = 'description for newly created repo'
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
clone_uri='http://repo.invalid/repo',
|
|
|
csrf_token=csrf_token))
|
|
|
response.mustcontain('invalid clone url')
|
|
|
|
|
|
@pytest.mark.xfail_backends("svn", reason="Depends on import support")
|
|
|
def test_create_remote_repo_wrong_clone_uri_hg_svn(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.new_repo_name()
|
|
|
description = 'description for newly created repo'
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
clone_uri='svn+http://svn.invalid/repo',
|
|
|
csrf_token=csrf_token))
|
|
|
response.mustcontain('invalid clone url')
|
|
|
|
|
|
def test_create_with_git_suffix(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.new_repo_name() + ".git"
|
|
|
description = 'description for newly created repo'
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
csrf_token=csrf_token))
|
|
|
response.mustcontain('Repository name cannot end with .git')
|
|
|
|
|
|
@pytest.mark.parametrize("suffix", [u'', u'ąęł'], ids=['', 'non-ascii'])
|
|
|
def test_delete(self, autologin_user, backend, suffix, csrf_token):
|
|
|
repo = backend.create_repo(name_suffix=suffix)
|
|
|
repo_name = repo.repo_name
|
|
|
|
|
|
response = self.app.post(url('repo', repo_name=repo_name),
|
|
|
params={'_method': 'delete',
|
|
|
'csrf_token': csrf_token})
|
|
|
assert_session_flash(response, 'Deleted repository %s' % (repo_name))
|
|
|
response.follow()
|
|
|
|
|
|
# check if repo was deleted from db
|
|
|
assert RepoModel().get_by_repo_name(repo_name) is None
|
|
|
assert not repo_on_filesystem(repo_name)
|
|
|
|
|
|
def test_show(self, autologin_user, backend):
|
|
|
self.app.get(url('repo', repo_name=backend.repo_name))
|
|
|
|
|
|
def test_edit(self, backend, autologin_user):
|
|
|
self.app.get(url('edit_repo', repo_name=backend.repo_name))
|
|
|
|
|
|
def test_edit_accessible_when_missing_requirements(
|
|
|
self, backend_hg, autologin_user):
|
|
|
scm_patcher = mock.patch.object(
|
|
|
Repository, 'scm_instance', side_effect=RepositoryRequirementError)
|
|
|
with scm_patcher:
|
|
|
self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
|
|
|
|
|
|
def test_set_private_flag_sets_default_to_none(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
# initially repository perm should be read
|
|
|
perm = _get_permission_for_user(user='default', repo=backend.repo_name)
|
|
|
assert len(perm) == 1
|
|
|
assert perm[0].permission.permission_name == 'repository.read'
|
|
|
assert not backend.repo.private
|
|
|
|
|
|
response = self.app.post(
|
|
|
url('repo', repo_name=backend.repo_name),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=1,
|
|
|
repo_name=backend.repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
user=TEST_USER_ADMIN_LOGIN,
|
|
|
_method='put',
|
|
|
csrf_token=csrf_token))
|
|
|
assert_session_flash(
|
|
|
response,
|
|
|
msg='Repository %s updated successfully' % (backend.repo_name))
|
|
|
assert backend.repo.private
|
|
|
|
|
|
# now the repo default permission should be None
|
|
|
perm = _get_permission_for_user(user='default', repo=backend.repo_name)
|
|
|
assert len(perm) == 1
|
|
|
assert perm[0].permission.permission_name == 'repository.none'
|
|
|
|
|
|
response = self.app.post(
|
|
|
url('repo', repo_name=backend.repo_name),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=backend.repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
user=TEST_USER_ADMIN_LOGIN,
|
|
|
_method='put',
|
|
|
csrf_token=csrf_token))
|
|
|
assert_session_flash(
|
|
|
response,
|
|
|
msg='Repository %s updated successfully' % (backend.repo_name))
|
|
|
assert not backend.repo.private
|
|
|
|
|
|
# we turn off private now the repo default permission should stay None
|
|
|
perm = _get_permission_for_user(user='default', repo=backend.repo_name)
|
|
|
assert len(perm) == 1
|
|
|
assert perm[0].permission.permission_name == 'repository.none'
|
|
|
|
|
|
# update this permission back
|
|
|
perm[0].permission = Permission.get_by_key('repository.read')
|
|
|
Session().add(perm[0])
|
|
|
Session().commit()
|
|
|
|
|
|
def test_default_user_cannot_access_private_repo_in_a_group(
|
|
|
self, autologin_user, user_util, backend, csrf_token):
|
|
|
|
|
|
group = user_util.create_repo_group()
|
|
|
|
|
|
repo = backend.create_repo(
|
|
|
repo_private=True, repo_group=group, repo_copy_permissions=True)
|
|
|
|
|
|
permissions = _get_permission_for_user(
|
|
|
user='default', repo=repo.repo_name)
|
|
|
assert len(permissions) == 1
|
|
|
assert permissions[0].permission.permission_name == 'repository.none'
|
|
|
assert permissions[0].repository.private is True
|
|
|
|
|
|
def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
|
|
|
repo = backend.repo
|
|
|
response = self.app.get(
|
|
|
url('edit_repo_advanced', repo_name=backend.repo_name))
|
|
|
opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
|
|
|
response.mustcontain(no=[opt])
|
|
|
|
|
|
def test_set_fork_of_target_repo(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
target_repo = 'target_%s' % backend.alias
|
|
|
fixture.create_repo(target_repo, repo_type=backend.alias)
|
|
|
repo2 = Repository.get_by_repo_name(target_repo)
|
|
|
response = self.app.post(
|
|
|
url('edit_repo_advanced_fork', repo_name=backend.repo_name),
|
|
|
params={'id_fork_of': repo2.repo_id, '_method': 'put',
|
|
|
'csrf_token': csrf_token})
|
|
|
repo = Repository.get_by_repo_name(backend.repo_name)
|
|
|
repo2 = Repository.get_by_repo_name(target_repo)
|
|
|
assert_session_flash(
|
|
|
response,
|
|
|
'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
|
|
|
|
|
|
assert repo.fork == repo2
|
|
|
response = response.follow()
|
|
|
# check if given repo is selected
|
|
|
|
|
|
opt = 'This repository is a fork of <a href="%s">%s</a>' % (
|
|
|
url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
|
|
|
|
|
|
response.mustcontain(opt)
|
|
|
|
|
|
fixture.destroy_repo(target_repo, forks='detach')
|
|
|
|
|
|
@pytest.mark.backends("hg", "git")
|
|
|
def test_set_fork_of_other_type_repo(self, autologin_user, backend,
|
|
|
csrf_token):
|
|
|
TARGET_REPO_MAP = {
|
|
|
'git': {
|
|
|
'type': 'hg',
|
|
|
'repo_name': HG_REPO},
|
|
|
'hg': {
|
|
|
'type': 'git',
|
|
|
'repo_name': GIT_REPO},
|
|
|
}
|
|
|
target_repo = TARGET_REPO_MAP[backend.alias]
|
|
|
|
|
|
repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
|
|
|
response = self.app.post(
|
|
|
url('edit_repo_advanced_fork', repo_name=backend.repo_name),
|
|
|
params={'id_fork_of': repo2.repo_id, '_method': 'put',
|
|
|
'csrf_token': csrf_token})
|
|
|
assert_session_flash(
|
|
|
response,
|
|
|
'Cannot set repository as fork of repository with other type')
|
|
|
|
|
|
def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
|
|
|
# mark it as None
|
|
|
response = self.app.post(
|
|
|
url('edit_repo_advanced_fork', repo_name=backend.repo_name),
|
|
|
params={'id_fork_of': None, '_method': 'put',
|
|
|
'csrf_token': csrf_token})
|
|
|
assert_session_flash(
|
|
|
response,
|
|
|
'Marked repo %s as fork of %s'
|
|
|
% (backend.repo_name, "Nothing"))
|
|
|
assert backend.repo.fork is None
|
|
|
|
|
|
def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
|
|
|
repo = Repository.get_by_repo_name(backend.repo_name)
|
|
|
response = self.app.post(
|
|
|
url('edit_repo_advanced_fork', repo_name=backend.repo_name),
|
|
|
params={'id_fork_of': repo.repo_id, '_method': 'put',
|
|
|
'csrf_token': csrf_token})
|
|
|
assert_session_flash(
|
|
|
response, 'An error occurred during this operation')
|
|
|
|
|
|
def test_create_on_top_level_without_permissions(self, backend):
|
|
|
session = login_user_session(
|
|
|
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
|
|
|
csrf_token = auth.get_csrf_token(session)
|
|
|
|
|
|
# revoke
|
|
|
user_model = UserModel()
|
|
|
# disable fork and create on default user
|
|
|
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
|
|
|
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
|
|
|
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
|
|
|
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
|
|
|
|
|
|
# disable on regular user
|
|
|
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
|
|
|
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
|
|
|
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
|
|
|
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
|
|
|
Session().commit()
|
|
|
|
|
|
repo_name = backend.new_repo_name()
|
|
|
description = 'description for newly created repo'
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
response.mustcontain(
|
|
|
u"You do not have the permission to store repositories in "
|
|
|
u"the root location.")
|
|
|
|
|
|
@mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
|
|
|
def test_create_repo_when_filesystem_op_fails(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.new_repo_name()
|
|
|
description = 'description for newly created repo'
|
|
|
|
|
|
response = self.app.post(
|
|
|
url('repos'),
|
|
|
fixture._get_repo_create_params(
|
|
|
repo_private=False,
|
|
|
repo_name=repo_name,
|
|
|
repo_type=backend.alias,
|
|
|
repo_description=description,
|
|
|
csrf_token=csrf_token))
|
|
|
|
|
|
assert_session_flash(
|
|
|
response, 'Error creating repository %s' % repo_name)
|
|
|
# repo must not be in db
|
|
|
assert backend.repo is None
|
|
|
# repo must not be in filesystem !
|
|
|
assert not repo_on_filesystem(repo_name)
|
|
|
|
|
|
def assert_repository_is_created_correctly(
|
|
|
self, repo_name, description, backend):
|
|
|
repo_name_utf8 = safe_str(repo_name)
|
|
|
|
|
|
# run the check page that triggers the flash message
|
|
|
response = self.app.get(url('repo_check_home', repo_name=repo_name))
|
|
|
assert response.json == {u'result': True}
|
|
|
|
|
|
flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
|
|
|
urllib.quote(repo_name_utf8), repo_name)
|
|
|
assert_session_flash(response, flash_msg)
|
|
|
|
|
|
# test if the repo was created in the database
|
|
|
new_repo = RepoModel().get_by_repo_name(repo_name)
|
|
|
|
|
|
assert new_repo.repo_name == repo_name
|
|
|
assert new_repo.description == description
|
|
|
|
|
|
# test if the repository is visible in the list ?
|
|
|
response = self.app.get(url('summary_home', repo_name=repo_name))
|
|
|
response.mustcontain(repo_name)
|
|
|
response.mustcontain(backend.alias)
|
|
|
|
|
|
assert repo_on_filesystem(repo_name)
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("app")
|
|
|
class TestVcsSettings(object):
|
|
|
FORM_DATA = {
|
|
|
'inherit_global_settings': False,
|
|
|
'hooks_changegroup_repo_size': False,
|
|
|
'hooks_changegroup_push_logger': False,
|
|
|
'hooks_outgoing_pull_logger': False,
|
|
|
'extensions_largefiles': False,
|
|
|
'phases_publish': 'False',
|
|
|
'rhodecode_pr_merge_enabled': False,
|
|
|
'rhodecode_use_outdated_comments': False,
|
|
|
'new_svn_branch': '',
|
|
|
'new_svn_tag': ''
|
|
|
}
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_global_settings_initial_values(self, autologin_user, backend):
|
|
|
repo_name = backend.repo_name
|
|
|
response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
|
|
|
|
|
|
expected_settings = (
|
|
|
'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
|
|
|
'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
|
|
|
'hooks_outgoing_pull_logger'
|
|
|
)
|
|
|
for setting in expected_settings:
|
|
|
self.assert_repo_value_equals_global_value(response, setting)
|
|
|
|
|
|
def test_show_settings_requires_repo_admin_permission(
|
|
|
self, backend, user_util, settings_util):
|
|
|
repo = backend.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
|
|
|
user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
|
|
|
login_user_session(
|
|
|
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
|
|
|
self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
|
|
|
def test_inherit_global_settings_flag_is_true_by_default(
|
|
|
self, autologin_user, backend):
|
|
|
repo_name = backend.repo_name
|
|
|
response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
|
|
|
|
|
|
assert_response = AssertResponse(response)
|
|
|
element = assert_response.get_element('#inherit_global_settings')
|
|
|
assert element.checked
|
|
|
|
|
|
@pytest.mark.parametrize('checked_value', [True, False])
|
|
|
def test_inherit_global_settings_value(
|
|
|
self, autologin_user, backend, checked_value, settings_util):
|
|
|
repo = backend.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
settings_util.create_repo_rhodecode_setting(
|
|
|
repo, 'inherit_vcs_settings', checked_value, 'bool')
|
|
|
response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
|
|
|
|
|
|
assert_response = AssertResponse(response)
|
|
|
element = assert_response.get_element('#inherit_global_settings')
|
|
|
assert element.checked == checked_value
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_hooks_settings_are_created(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
for section, key in VcsSettingsModel.HOOKS_SETTINGS:
|
|
|
ui = settings.get_ui_by_section_and_key(section, key)
|
|
|
assert ui.ui_active is False
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_hooks_settings_are_not_created_for_svn(
|
|
|
self, autologin_user, backend_svn, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
for section, key in VcsSettingsModel.HOOKS_SETTINGS:
|
|
|
ui = settings.get_ui_by_section_and_key(section, key)
|
|
|
assert ui is None
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_hooks_settings_are_updated(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
for section, key in VcsSettingsModel.HOOKS_SETTINGS:
|
|
|
settings.create_ui_section_value(section, '', key=key, active=True)
|
|
|
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
try:
|
|
|
for section, key in VcsSettingsModel.HOOKS_SETTINGS:
|
|
|
ui = settings.get_ui_by_section_and_key(section, key)
|
|
|
assert ui.ui_active is False
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_hooks_settings_are_not_updated_for_svn(
|
|
|
self, autologin_user, backend_svn, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
for section, key in VcsSettingsModel.HOOKS_SETTINGS:
|
|
|
settings.create_ui_section_value(section, '', key=key, active=True)
|
|
|
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
try:
|
|
|
for section, key in VcsSettingsModel.HOOKS_SETTINGS:
|
|
|
ui = settings.get_ui_by_section_and_key(section, key)
|
|
|
assert ui.ui_active is True
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_pr_settings_are_created(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
setting = settings.get_setting_by_name(name)
|
|
|
assert setting.app_settings_value is False
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_pr_settings_are_not_created_for_svn(
|
|
|
self, autologin_user, backend_svn, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
setting = settings.get_setting_by_name(name)
|
|
|
assert setting is None
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_pr_settings_creation_requires_repo_admin_permission(
|
|
|
self, backend, user_util, settings_util, csrf_token):
|
|
|
repo = backend.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
|
|
|
|
|
|
logout_user_session(self.app, csrf_token)
|
|
|
session = login_user_session(
|
|
|
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
|
|
|
new_csrf_token = auth.get_csrf_token(session)
|
|
|
|
|
|
user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = new_csrf_token
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
|
|
|
try:
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data,
|
|
|
status=302)
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_pr_settings_are_updated(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
settings.create_or_update_setting(name, True, 'bool')
|
|
|
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
try:
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
setting = settings.get_setting_by_name(name)
|
|
|
assert setting.app_settings_value is False
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_pr_settings_are_not_updated_for_svn(
|
|
|
self, autologin_user, backend_svn, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
settings.create_or_update_setting(name, True, 'bool')
|
|
|
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
try:
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
setting = settings.get_setting_by_name(name)
|
|
|
assert setting.app_settings_value is True
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_svn_settings_are_created(
|
|
|
self, autologin_user, backend_svn, csrf_token, settings_util):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['new_svn_tag'] = 'svn-tag'
|
|
|
data['new_svn_branch'] = 'svn-branch'
|
|
|
data['csrf_token'] = csrf_token
|
|
|
|
|
|
# Create few global settings to make sure that uniqueness validators
|
|
|
# are not triggered
|
|
|
settings_util.create_rhodecode_ui(
|
|
|
VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
|
|
|
settings_util.create_rhodecode_ui(
|
|
|
VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
|
|
|
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
svn_branches = settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_BRANCH_SECTION)
|
|
|
svn_branch_names = [b.ui_value for b in svn_branches]
|
|
|
svn_tags = settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_TAG_SECTION)
|
|
|
svn_tag_names = [b.ui_value for b in svn_tags]
|
|
|
assert 'svn-branch' in svn_branch_names
|
|
|
assert 'svn-tag' in svn_tag_names
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_svn_settings_are_unique(
|
|
|
self, autologin_user, backend_svn, csrf_token, settings_util):
|
|
|
repo = backend_svn.repo
|
|
|
repo_name = repo.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['new_svn_tag'] = 'test_tag'
|
|
|
data['new_svn_branch'] = 'test_branch'
|
|
|
data['csrf_token'] = csrf_token
|
|
|
settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
|
|
|
settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
|
|
|
|
|
|
response = self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=200)
|
|
|
response.mustcontain('Pattern already exists')
|
|
|
|
|
|
def test_svn_settings_with_empty_values_are_not_created(
|
|
|
self, autologin_user, backend_svn, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
svn_branches = settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_BRANCH_SECTION)
|
|
|
svn_tags = settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_TAG_SECTION)
|
|
|
assert len(svn_branches) == 0
|
|
|
assert len(svn_tags) == 0
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_svn_settings_are_shown_for_svn_repository(
|
|
|
self, autologin_user, backend_svn, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
response = self.app.get(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
response.mustcontain('Subversion Settings')
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_svn_settings_are_not_created_for_not_svn_repository(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
svn_branches = settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_BRANCH_SECTION)
|
|
|
svn_tags = settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_TAG_SECTION)
|
|
|
assert len(svn_branches) == 0
|
|
|
assert len(svn_tags) == 0
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
@pytest.mark.skip_backends('svn')
|
|
|
def test_svn_settings_are_shown_only_for_svn_repository(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
response = self.app.get(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
response.mustcontain(no='Subversion Settings')
|
|
|
|
|
|
def test_hg_settings_are_created(
|
|
|
self, autologin_user, backend_hg, csrf_token):
|
|
|
repo_name = backend_hg.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['new_svn_tag'] = 'svn-tag'
|
|
|
data['new_svn_branch'] = 'svn-branch'
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
largefiles_ui = settings.get_ui_by_section_and_key(
|
|
|
'extensions', 'largefiles')
|
|
|
assert largefiles_ui.ui_active is False
|
|
|
phases_ui = settings.get_ui_by_section_and_key(
|
|
|
'phases', 'publish')
|
|
|
assert str2bool(phases_ui.ui_value) is False
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_hg_settings_are_updated(
|
|
|
self, autologin_user, backend_hg, csrf_token):
|
|
|
repo_name = backend_hg.repo_name
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
settings.create_ui_section_value(
|
|
|
'extensions', '', key='largefiles', active=True)
|
|
|
settings.create_ui_section_value(
|
|
|
'phases', '1', key='publish', active=True)
|
|
|
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
try:
|
|
|
largefiles_ui = settings.get_ui_by_section_and_key(
|
|
|
'extensions', 'largefiles')
|
|
|
assert largefiles_ui.ui_active is False
|
|
|
phases_ui = settings.get_ui_by_section_and_key(
|
|
|
'phases', 'publish')
|
|
|
assert str2bool(phases_ui.ui_value) is False
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_hg_settings_are_shown_for_hg_repository(
|
|
|
self, autologin_user, backend_hg, csrf_token):
|
|
|
repo_name = backend_hg.repo_name
|
|
|
response = self.app.get(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
response.mustcontain('Mercurial Settings')
|
|
|
|
|
|
@pytest.mark.skip_backends('hg')
|
|
|
def test_hg_settings_are_created_only_for_hg_repository(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
largefiles_ui = settings.get_ui_by_section_and_key(
|
|
|
'extensions', 'largefiles')
|
|
|
assert largefiles_ui is None
|
|
|
phases_ui = settings.get_ui_by_section_and_key(
|
|
|
'phases', 'publish')
|
|
|
assert phases_ui is None
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
@pytest.mark.skip_backends('hg')
|
|
|
def test_hg_settings_are_shown_only_for_hg_repository(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
response = self.app.get(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
response.mustcontain(no='Mercurial Settings')
|
|
|
|
|
|
@pytest.mark.skip_backends('hg')
|
|
|
def test_hg_settings_are_updated_only_for_hg_repository(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
settings.create_ui_section_value(
|
|
|
'extensions', '', key='largefiles', active=True)
|
|
|
settings.create_ui_section_value(
|
|
|
'phases', '1', key='publish', active=True)
|
|
|
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
try:
|
|
|
largefiles_ui = settings.get_ui_by_section_and_key(
|
|
|
'extensions', 'largefiles')
|
|
|
assert largefiles_ui.ui_active is True
|
|
|
phases_ui = settings.get_ui_by_section_and_key(
|
|
|
'phases', 'publish')
|
|
|
assert phases_ui.ui_value == '1'
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_per_repo_svn_settings_are_displayed(
|
|
|
self, autologin_user, backend_svn, settings_util):
|
|
|
repo = backend_svn.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
branches = [
|
|
|
settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_BRANCH_SECTION,
|
|
|
'branch_{}'.format(i))
|
|
|
for i in range(10)]
|
|
|
tags = [
|
|
|
settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
|
|
|
for i in range(10)]
|
|
|
|
|
|
response = self.app.get(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
assert_response = AssertResponse(response)
|
|
|
for branch in branches:
|
|
|
css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
|
|
|
element = assert_response.get_element(css_selector)
|
|
|
assert element.value == branch.ui_value
|
|
|
for tag in tags:
|
|
|
css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
|
|
|
element = assert_response.get_element(css_selector)
|
|
|
assert element.value == tag.ui_value
|
|
|
|
|
|
def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
|
|
|
self, autologin_user, backend_svn, settings_util):
|
|
|
repo = backend_svn.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
response = self.app.get(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), status=200)
|
|
|
response.mustcontain(no='<label>Hooks:</label>')
|
|
|
response.mustcontain(no='<label>Pull Request Settings:</label>')
|
|
|
|
|
|
def test_inherit_global_settings_value_is_saved(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
data['inherit_global_settings'] = True
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
vcs_settings = VcsSettingsModel(repo=repo_name)
|
|
|
try:
|
|
|
assert vcs_settings.inherit_global_settings is True
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_repo_cache_is_invalidated_when_settings_are_updated(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
data['inherit_global_settings'] = True
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
|
|
|
invalidation_patcher = mock.patch(
|
|
|
'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
|
|
|
with invalidation_patcher as invalidation_mock:
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data,
|
|
|
status=302)
|
|
|
try:
|
|
|
invalidation_mock.assert_called_once_with(repo_name, delete=True)
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_other_settings_not_saved_inherit_global_settings_is_true(
|
|
|
self, autologin_user, backend, csrf_token):
|
|
|
repo_name = backend.repo_name
|
|
|
data = self.FORM_DATA.copy()
|
|
|
data['csrf_token'] = csrf_token
|
|
|
data['inherit_global_settings'] = True
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data, status=302)
|
|
|
|
|
|
settings = SettingsModel(repo=repo_name)
|
|
|
ui_settings = (
|
|
|
VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
|
|
|
|
|
|
vcs_settings = []
|
|
|
try:
|
|
|
for section, key in ui_settings:
|
|
|
ui = settings.get_ui_by_section_and_key(section, key)
|
|
|
if ui:
|
|
|
vcs_settings.append(ui)
|
|
|
vcs_settings.extend(settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_BRANCH_SECTION))
|
|
|
vcs_settings.extend(settings.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_TAG_SECTION))
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
setting = settings.get_setting_by_name(name)
|
|
|
if setting:
|
|
|
vcs_settings.append(setting)
|
|
|
assert vcs_settings == []
|
|
|
finally:
|
|
|
self._cleanup_repo_settings(settings)
|
|
|
|
|
|
def test_delete_svn_branch_and_tag_patterns(
|
|
|
self, autologin_user, backend_svn, settings_util, csrf_token):
|
|
|
repo = backend_svn.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
branch = settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
|
|
|
cleanup=False)
|
|
|
tag = settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
|
|
|
data = {
|
|
|
'_method': 'delete',
|
|
|
'csrf_token': csrf_token
|
|
|
}
|
|
|
for id_ in (branch.ui_id, tag.ui_id):
|
|
|
data['delete_svn_pattern'] = id_,
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data,
|
|
|
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
|
|
|
settings = VcsSettingsModel(repo=repo_name)
|
|
|
assert settings.get_repo_svn_branch_patterns() == []
|
|
|
|
|
|
def test_delete_svn_branch_requires_repo_admin_permission(
|
|
|
self, backend_svn, user_util, settings_util, csrf_token):
|
|
|
repo = backend_svn.create_repo()
|
|
|
repo_name = repo.repo_name
|
|
|
user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
|
|
|
logout_user_session(self.app, csrf_token)
|
|
|
session = login_user_session(
|
|
|
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
|
|
|
csrf_token = auth.get_csrf_token(session)
|
|
|
user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
|
|
|
branch = settings_util.create_repo_rhodecode_ui(
|
|
|
repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
|
|
|
cleanup=False)
|
|
|
data = {
|
|
|
'_method': 'delete',
|
|
|
'csrf_token': csrf_token,
|
|
|
'delete_svn_pattern': branch.ui_id
|
|
|
}
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data,
|
|
|
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
|
|
|
|
|
|
def test_delete_svn_branch_raises_400_when_not_found(
|
|
|
self, autologin_user, backend_svn, settings_util, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
data = {
|
|
|
'_method': 'delete',
|
|
|
'delete_svn_pattern': 123,
|
|
|
'csrf_token': csrf_token
|
|
|
}
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data,
|
|
|
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
|
|
|
|
|
|
def test_delete_svn_branch_raises_400_when_no_id_specified(
|
|
|
self, autologin_user, backend_svn, settings_util, csrf_token):
|
|
|
repo_name = backend_svn.repo_name
|
|
|
data = {
|
|
|
'_method': 'delete',
|
|
|
'csrf_token': csrf_token
|
|
|
}
|
|
|
self.app.post(
|
|
|
url('repo_vcs_settings', repo_name=repo_name), data,
|
|
|
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
|
|
|
|
|
|
def _cleanup_repo_settings(self, settings_model):
|
|
|
cleanup = []
|
|
|
ui_settings = (
|
|
|
VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
|
|
|
|
|
|
for section, key in ui_settings:
|
|
|
ui = settings_model.get_ui_by_section_and_key(section, key)
|
|
|
if ui:
|
|
|
cleanup.append(ui)
|
|
|
|
|
|
cleanup.extend(settings_model.get_ui_by_section(
|
|
|
VcsSettingsModel.INHERIT_SETTINGS))
|
|
|
cleanup.extend(settings_model.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_BRANCH_SECTION))
|
|
|
cleanup.extend(settings_model.get_ui_by_section(
|
|
|
VcsSettingsModel.SVN_TAG_SECTION))
|
|
|
|
|
|
for name in VcsSettingsModel.GENERAL_SETTINGS:
|
|
|
setting = settings_model.get_setting_by_name(name)
|
|
|
if setting:
|
|
|
cleanup.append(setting)
|
|
|
|
|
|
for object_ in cleanup:
|
|
|
Session().delete(object_)
|
|
|
Session().commit()
|
|
|
|
|
|
def assert_repo_value_equals_global_value(self, response, setting):
|
|
|
assert_response = AssertResponse(response)
|
|
|
global_css_selector = '[name={}_inherited]'.format(setting)
|
|
|
repo_css_selector = '[name={}]'.format(setting)
|
|
|
repo_element = assert_response.get_element(repo_css_selector)
|
|
|
global_element = assert_response.get_element(global_css_selector)
|
|
|
assert repo_element.value == global_element.value
|
|
|
|
|
|
|
|
|
def _get_permission_for_user(user, repo):
|
|
|
perm = UserRepoToPerm.query()\
|
|
|
.filter(UserRepoToPerm.repository ==
|
|
|
Repository.get_by_repo_name(repo))\
|
|
|
.filter(UserRepoToPerm.user == User.get_by_username(user))\
|
|
|
.all()
|
|
|
return perm
|
|
|
|