# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import urllib.request
import urllib.parse
import urllib.error
import mock
import pytest
from rhodecode.apps._base import ADMIN_PREFIX
from rhodecode.lib import auth
from rhodecode.lib.utils2 import safe_str
from rhodecode.lib import helpers as h
from rhodecode.model.db import (
Repository, RepoGroup, UserRepoToPerm, User, Permission)
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.user import UserModel
from rhodecode.tests import (
login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
from rhodecode.tests.fixtures.rc_fixture import Fixture, error_function
from rhodecode.tests.utils import repo_on_filesystem
from rhodecode.tests.routes import route_path
fixture = Fixture()
def _get_permission_for_user(user, repo):
perm = UserRepoToPerm.query()\
.filter(UserRepoToPerm.repository ==
Repository.get_by_repo_name(repo))\
.filter(UserRepoToPerm.user == User.get_by_username(user))\
.all()
return perm
@pytest.mark.usefixtures("app")
class TestAdminRepos(object):
def test_repo_list(self, autologin_user, user_util, xhr_header):
repo = user_util.create_repo()
repo_name = repo.repo_name
response = self.app.get(
route_path('repos_data'), status=200,
extra_environ=xhr_header)
response.mustcontain(repo_name)
def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
response = self.app.get(route_path('repo_new'), status=200)
assert_response = response.assert_response()
element = assert_response.get_element('[name=repo_type]')
assert element.get('value') == 'git'
def test_create_page_non_restricted_backends(self, autologin_user, backend):
response = self.app.get(route_path('repo_new'), status=200)
assert_response = response.assert_response()
assert ['hg', 'git', 'svn'] == [x.get('value') for x in assert_response.get_elements('[name=repo_type]')]
@pytest.mark.parametrize(
"suffix", ['', 'xxa'], ids=['', 'non-ascii'])
def test_create(self, autologin_user, backend, suffix, csrf_token):
repo_name_unicode = backend.new_repo_name(suffix=suffix)
repo_name = repo_name_unicode
description_unicode = 'description for newly created repo' + suffix
description = description_unicode
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
csrf_token=csrf_token),
status=302)
self.assert_repository_is_created_correctly(
repo_name, description, backend)
def test_create_numeric_name(self, autologin_user, backend, csrf_token):
numeric_repo = '1234'
repo_name = numeric_repo
description = 'description for newly created repo' + numeric_repo
self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
csrf_token=csrf_token))
self.assert_repository_is_created_correctly(
repo_name, description, backend)
@pytest.mark.parametrize("suffix", ['', '_ąćę'], ids=['', 'non-ascii'])
def test_create_in_group(
self, autologin_user, backend, suffix, csrf_token):
# create GROUP
group_name = f'sometest_{backend.alias}'
gr = RepoGroupModel().create(group_name=group_name,
group_description='test',
owner=TEST_USER_ADMIN_LOGIN)
Session().commit()
repo_name = f'ingroup{suffix}'
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
description = 'description for newly created repo'
self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=safe_str(repo_name),
repo_type=backend.alias,
repo_description=description,
repo_group=gr.group_id,
csrf_token=csrf_token))
# TODO: johbo: Cleanup work to fixture
try:
self.assert_repository_is_created_correctly(
repo_name_full, description, backend)
new_repo = RepoModel().get_by_repo_name(repo_name_full)
inherited_perms = UserRepoToPerm.query().filter(
UserRepoToPerm.repository_id == new_repo.repo_id).all()
assert len(inherited_perms) == 1
finally:
RepoModel().delete(repo_name_full)
RepoGroupModel().delete(group_name)
Session().commit()
def test_create_in_group_numeric_name(
self, autologin_user, backend, csrf_token):
# create GROUP
group_name = 'sometest_%s' % backend.alias
gr = RepoGroupModel().create(group_name=group_name,
group_description='test',
owner=TEST_USER_ADMIN_LOGIN)
Session().commit()
repo_name = '12345'
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
description = 'description for newly created repo'
self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
repo_group=gr.group_id,
csrf_token=csrf_token))
# TODO: johbo: Cleanup work to fixture
try:
self.assert_repository_is_created_correctly(
repo_name_full, description, backend)
new_repo = RepoModel().get_by_repo_name(repo_name_full)
inherited_perms = UserRepoToPerm.query()\
.filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
assert len(inherited_perms) == 1
finally:
RepoModel().delete(repo_name_full)
RepoGroupModel().delete(group_name)
Session().commit()
def test_create_in_group_without_needed_permissions(self, backend):
session = login_user_session(
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
csrf_token = auth.get_csrf_token(session)
# revoke
user_model = UserModel()
# disable fork and create on default user
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
# disable on regular user
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
Session().commit()
# create GROUP
group_name = 'reg_sometest_%s' % backend.alias
gr = RepoGroupModel().create(group_name=group_name,
group_description='test',
owner=TEST_USER_ADMIN_LOGIN)
Session().commit()
repo_group_id = gr.group_id
group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
gr_allowed = RepoGroupModel().create(
group_name=group_name_allowed,
group_description='test',
owner=TEST_USER_REGULAR_LOGIN)
allowed_repo_group_id = gr_allowed.group_id
Session().commit()
repo_name = 'ingroup'
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
repo_group=repo_group_id,
csrf_token=csrf_token))
response.mustcontain('Invalid value')
# user is allowed to create in this group
repo_name = 'ingroup'
repo_name_full = RepoGroup.url_sep().join(
[group_name_allowed, repo_name])
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
repo_group=allowed_repo_group_id,
csrf_token=csrf_token))
# TODO: johbo: Cleanup in pytest fixture
try:
self.assert_repository_is_created_correctly(
repo_name_full, description, backend)
new_repo = RepoModel().get_by_repo_name(repo_name_full)
inherited_perms = UserRepoToPerm.query().filter(
UserRepoToPerm.repository_id == new_repo.repo_id).all()
assert len(inherited_perms) == 1
assert repo_on_filesystem(repo_name_full)
finally:
RepoModel().delete(repo_name_full)
RepoGroupModel().delete(group_name)
RepoGroupModel().delete(group_name_allowed)
Session().commit()
def test_create_in_group_inherit_permissions(self, autologin_user, backend,
csrf_token):
# create GROUP
group_name = 'sometest_%s' % backend.alias
gr = RepoGroupModel().create(group_name=group_name,
group_description='test',
owner=TEST_USER_ADMIN_LOGIN)
perm = Permission.get_by_key('repository.write')
RepoGroupModel().grant_user_permission(
gr, TEST_USER_REGULAR_LOGIN, perm)
# add repo permissions
Session().commit()
repo_group_id = gr.group_id
repo_name = 'ingroup_inherited_%s' % backend.alias
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
description = 'description for newly created repo'
self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
repo_group=repo_group_id,
repo_copy_permissions=True,
csrf_token=csrf_token))
# TODO: johbo: Cleanup to pytest fixture
try:
self.assert_repository_is_created_correctly(
repo_name_full, description, backend)
except Exception:
RepoGroupModel().delete(group_name)
Session().commit()
raise
# check if inherited permissions are applied
new_repo = RepoModel().get_by_repo_name(repo_name_full)
inherited_perms = UserRepoToPerm.query().filter(
UserRepoToPerm.repository_id == new_repo.repo_id).all()
assert len(inherited_perms) == 2
assert TEST_USER_REGULAR_LOGIN in [
x.user.username for x in inherited_perms]
assert 'repository.write' in [
x.permission.permission_name for x in inherited_perms]
RepoModel().delete(repo_name_full)
RepoGroupModel().delete(group_name)
Session().commit()
@pytest.mark.xfail_backends(
"git", "hg", reason="Missing reposerver support")
def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
csrf_token):
source_repo = backend.create_repo(number_of_commits=2)
source_repo_name = source_repo.repo_name
reposerver.serve(source_repo.scm_instance())
repo_name = backend.new_repo_name()
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description='',
clone_uri=reposerver.url,
csrf_token=csrf_token),
status=302)
# Should be redirected to the creating page
response.mustcontain('repo_creating')
# Expecting that both repositories have same history
source_repo = RepoModel().get_by_repo_name(source_repo_name)
source_vcs = source_repo.scm_instance()
repo = RepoModel().get_by_repo_name(repo_name)
repo_vcs = repo.scm_instance()
assert source_vcs[0].message == repo_vcs[0].message
assert source_vcs.count() == repo_vcs.count()
assert source_vcs.commit_ids == repo_vcs.commit_ids
@pytest.mark.xfail_backends("svn", reason="Depends on import support")
def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
csrf_token):
repo_name = backend.new_repo_name()
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
clone_uri='http://repo.invalid/repo',
csrf_token=csrf_token))
response.mustcontain('invalid clone url')
@pytest.mark.xfail_backends("svn", reason="Depends on import support")
def test_create_remote_repo_wrong_clone_uri_hg_svn(
self, autologin_user, backend, csrf_token):
repo_name = backend.new_repo_name()
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
clone_uri='svn+http://svn.invalid/repo',
csrf_token=csrf_token))
response.mustcontain('invalid clone url')
def test_create_with_git_suffix(
self, autologin_user, backend, csrf_token):
repo_name = backend.new_repo_name() + ".git"
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
csrf_token=csrf_token))
response.mustcontain('Repository name cannot end with .git')
def test_default_user_cannot_access_private_repo_in_a_group(
self, autologin_user, user_util, backend):
group = user_util.create_repo_group()
repo = backend.create_repo(
repo_private=True, repo_group=group, repo_copy_permissions=True)
permissions = _get_permission_for_user(
user='default', repo=repo.repo_name)
assert len(permissions) == 1
assert permissions[0].permission.permission_name == 'repository.none'
assert permissions[0].repository.private is True
def test_create_on_top_level_without_permissions(self, backend):
session = login_user_session(
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
csrf_token = auth.get_csrf_token(session)
# revoke
user_model = UserModel()
# disable fork and create on default user
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
# disable on regular user
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
Session().commit()
repo_name = backend.new_repo_name()
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
csrf_token=csrf_token))
response.mustcontain(
"You do not have the permission to store repositories in "
"the root location.")
@mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
def test_create_repo_when_filesystem_op_fails(
self, autologin_user, backend, csrf_token):
repo_name = backend.new_repo_name()
description = 'description for newly created repo'
response = self.app.post(
route_path('repo_create'),
fixture._get_repo_create_params(
repo_private=False,
repo_name=repo_name,
repo_type=backend.alias,
repo_description=description,
csrf_token=csrf_token))
assert_session_flash(
response, 'Error creating repository %s' % repo_name)
# repo must not be in db
assert backend.repo is None
# repo must not be in filesystem !
assert not repo_on_filesystem(repo_name)
def assert_repository_is_created_correctly(self, repo_name, description, backend):
url_quoted_repo_name = urllib.parse.quote(repo_name)
# run the check page that triggers the flash message
response = self.app.get(
route_path('repo_creating_check', repo_name=repo_name))
assert response.json == {'result': True}
flash_msg = 'Created repository {}'.format(url_quoted_repo_name, repo_name)
assert_session_flash(response, flash_msg)
# test if the repo was created in the database
new_repo = RepoModel().get_by_repo_name(repo_name)
assert new_repo.repo_name == repo_name
assert new_repo.description == description
# test if the repository is visible in the list ?
response = self.app.get(
h.route_path('repo_summary', repo_name=repo_name))
response.mustcontain(repo_name)
response.mustcontain(backend.alias)
assert repo_on_filesystem(repo_name)