test_admin_repos.py
515 lines
| 20.5 KiB
| text/x-python
|
PythonLexer
r2014 | ||||
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r2014 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r5087 | import urllib.request | |||
import urllib.parse | ||||
import urllib.error | ||||
r2014 | ||||
import mock | ||||
import pytest | ||||
from rhodecode.apps._base import ADMIN_PREFIX | ||||
from rhodecode.lib import auth | ||||
from rhodecode.lib.utils2 import safe_str | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.model.db import ( | ||||
Repository, RepoGroup, UserRepoToPerm, User, Permission) | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.repo_group import RepoGroupModel | ||||
from rhodecode.model.user import UserModel | ||||
from rhodecode.tests import ( | ||||
login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN, | ||||
TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) | ||||
from rhodecode.tests.fixture import Fixture, error_function | ||||
from rhodecode.tests.utils import AssertResponse, repo_on_filesystem | ||||
fixture = Fixture() | ||||
def route_path(name, params=None, **kwargs): | ||||
r5087 | import urllib.request | |||
import urllib.parse | ||||
import urllib.error | ||||
r2014 | ||||
base_url = { | ||||
'repos': ADMIN_PREFIX + '/repos', | ||||
r4151 | 'repos_data': ADMIN_PREFIX + '/repos_data', | |||
r2014 | 'repo_new': ADMIN_PREFIX + '/repos/new', | |||
'repo_create': ADMIN_PREFIX + '/repos/create', | ||||
'repo_creating_check': '/{repo_name}/repo_creating_check', | ||||
}[name].format(**kwargs) | ||||
if params: | ||||
r4914 | base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params)) | |||
r2014 | return base_url | |||
def _get_permission_for_user(user, repo): | ||||
perm = UserRepoToPerm.query()\ | ||||
.filter(UserRepoToPerm.repository == | ||||
Repository.get_by_repo_name(repo))\ | ||||
.filter(UserRepoToPerm.user == User.get_by_username(user))\ | ||||
.all() | ||||
return perm | ||||
@pytest.mark.usefixtures("app") | ||||
class TestAdminRepos(object): | ||||
r4151 | def test_repo_list(self, autologin_user, user_util, xhr_header): | |||
r2014 | repo = user_util.create_repo() | |||
r2786 | repo_name = repo.repo_name | |||
r2014 | response = self.app.get( | |||
r4151 | route_path('repos_data'), status=200, | |||
extra_environ=xhr_header) | ||||
r2014 | ||||
r2786 | response.mustcontain(repo_name) | |||
r2014 | ||||
def test_create_page_restricted_to_single_backend(self, autologin_user, backend): | ||||
with mock.patch('rhodecode.BACKENDS', {'git': 'git'}): | ||||
response = self.app.get(route_path('repo_new'), status=200) | ||||
r3981 | assert_response = response.assert_response() | |||
r4330 | element = assert_response.get_element('[name=repo_type]') | |||
assert element.get('value') == 'git' | ||||
r2014 | ||||
def test_create_page_non_restricted_backends(self, autologin_user, backend): | ||||
response = self.app.get(route_path('repo_new'), status=200) | ||||
r3981 | assert_response = response.assert_response() | |||
r4330 | assert ['hg', 'git', 'svn'] == [x.get('value') for x in assert_response.get_elements('[name=repo_type]')] | |||
r2014 | ||||
@pytest.mark.parametrize( | ||||
r5087 | "suffix", ['', 'xxa'], ids=['', 'non-ascii']) | |||
r2014 | def test_create(self, autologin_user, backend, suffix, csrf_token): | |||
repo_name_unicode = backend.new_repo_name(suffix=suffix) | ||||
r5087 | repo_name = repo_name_unicode | |||
description_unicode = 'description for newly created repo' + suffix | ||||
description = description_unicode | ||||
r2014 | response = self.app.post( | |||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
csrf_token=csrf_token), | ||||
status=302) | ||||
self.assert_repository_is_created_correctly( | ||||
repo_name, description, backend) | ||||
def test_create_numeric_name(self, autologin_user, backend, csrf_token): | ||||
numeric_repo = '1234' | ||||
repo_name = numeric_repo | ||||
description = 'description for newly created repo' + numeric_repo | ||||
self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
csrf_token=csrf_token)) | ||||
self.assert_repository_is_created_correctly( | ||||
repo_name, description, backend) | ||||
r5087 | @pytest.mark.parametrize("suffix", ['', '_ąćę'], ids=['', 'non-ascii']) | |||
r2014 | def test_create_in_group( | |||
self, autologin_user, backend, suffix, csrf_token): | ||||
# create GROUP | ||||
r5087 | group_name = f'sometest_{backend.alias}' | |||
r2014 | gr = RepoGroupModel().create(group_name=group_name, | |||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
Session().commit() | ||||
r5087 | repo_name = f'ingroup{suffix}' | |||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
description = 'description for newly created repo' | ||||
r2014 | self.app.post( | |||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=safe_str(repo_name), | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
repo_group=gr.group_id, | ||||
csrf_token=csrf_token)) | ||||
# TODO: johbo: Cleanup work to fixture | ||||
try: | ||||
self.assert_repository_is_created_correctly( | ||||
repo_name_full, description, backend) | ||||
new_repo = RepoModel().get_by_repo_name(repo_name_full) | ||||
inherited_perms = UserRepoToPerm.query().filter( | ||||
UserRepoToPerm.repository_id == new_repo.repo_id).all() | ||||
assert len(inherited_perms) == 1 | ||||
finally: | ||||
RepoModel().delete(repo_name_full) | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
def test_create_in_group_numeric_name( | ||||
self, autologin_user, backend, csrf_token): | ||||
# create GROUP | ||||
group_name = 'sometest_%s' % backend.alias | ||||
gr = RepoGroupModel().create(group_name=group_name, | ||||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
Session().commit() | ||||
repo_name = '12345' | ||||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
description = 'description for newly created repo' | ||||
self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
repo_group=gr.group_id, | ||||
csrf_token=csrf_token)) | ||||
# TODO: johbo: Cleanup work to fixture | ||||
try: | ||||
self.assert_repository_is_created_correctly( | ||||
repo_name_full, description, backend) | ||||
new_repo = RepoModel().get_by_repo_name(repo_name_full) | ||||
inherited_perms = UserRepoToPerm.query()\ | ||||
.filter(UserRepoToPerm.repository_id == new_repo.repo_id).all() | ||||
assert len(inherited_perms) == 1 | ||||
finally: | ||||
RepoModel().delete(repo_name_full) | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
def test_create_in_group_without_needed_permissions(self, backend): | ||||
session = login_user_session( | ||||
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) | ||||
csrf_token = auth.get_csrf_token(session) | ||||
# revoke | ||||
user_model = UserModel() | ||||
# disable fork and create on default user | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') | ||||
# disable on regular user | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') | ||||
Session().commit() | ||||
# create GROUP | ||||
group_name = 'reg_sometest_%s' % backend.alias | ||||
gr = RepoGroupModel().create(group_name=group_name, | ||||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
Session().commit() | ||||
r2786 | repo_group_id = gr.group_id | |||
r2014 | ||||
group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias | ||||
gr_allowed = RepoGroupModel().create( | ||||
group_name=group_name_allowed, | ||||
group_description='test', | ||||
owner=TEST_USER_REGULAR_LOGIN) | ||||
r2786 | allowed_repo_group_id = gr_allowed.group_id | |||
r2014 | Session().commit() | |||
repo_name = 'ingroup' | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
r2786 | repo_group=repo_group_id, | |||
r2014 | csrf_token=csrf_token)) | |||
response.mustcontain('Invalid value') | ||||
# user is allowed to create in this group | ||||
repo_name = 'ingroup' | ||||
repo_name_full = RepoGroup.url_sep().join( | ||||
[group_name_allowed, repo_name]) | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
r2786 | repo_group=allowed_repo_group_id, | |||
r2014 | csrf_token=csrf_token)) | |||
# TODO: johbo: Cleanup in pytest fixture | ||||
try: | ||||
self.assert_repository_is_created_correctly( | ||||
repo_name_full, description, backend) | ||||
new_repo = RepoModel().get_by_repo_name(repo_name_full) | ||||
inherited_perms = UserRepoToPerm.query().filter( | ||||
UserRepoToPerm.repository_id == new_repo.repo_id).all() | ||||
assert len(inherited_perms) == 1 | ||||
assert repo_on_filesystem(repo_name_full) | ||||
finally: | ||||
RepoModel().delete(repo_name_full) | ||||
RepoGroupModel().delete(group_name) | ||||
RepoGroupModel().delete(group_name_allowed) | ||||
Session().commit() | ||||
def test_create_in_group_inherit_permissions(self, autologin_user, backend, | ||||
csrf_token): | ||||
# create GROUP | ||||
group_name = 'sometest_%s' % backend.alias | ||||
gr = RepoGroupModel().create(group_name=group_name, | ||||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
perm = Permission.get_by_key('repository.write') | ||||
RepoGroupModel().grant_user_permission( | ||||
gr, TEST_USER_REGULAR_LOGIN, perm) | ||||
# add repo permissions | ||||
Session().commit() | ||||
r2786 | repo_group_id = gr.group_id | |||
r2014 | repo_name = 'ingroup_inherited_%s' % backend.alias | |||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
description = 'description for newly created repo' | ||||
self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
r2786 | repo_group=repo_group_id, | |||
r2014 | repo_copy_permissions=True, | |||
csrf_token=csrf_token)) | ||||
# TODO: johbo: Cleanup to pytest fixture | ||||
try: | ||||
self.assert_repository_is_created_correctly( | ||||
repo_name_full, description, backend) | ||||
except Exception: | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
raise | ||||
# check if inherited permissions are applied | ||||
new_repo = RepoModel().get_by_repo_name(repo_name_full) | ||||
inherited_perms = UserRepoToPerm.query().filter( | ||||
UserRepoToPerm.repository_id == new_repo.repo_id).all() | ||||
assert len(inherited_perms) == 2 | ||||
assert TEST_USER_REGULAR_LOGIN in [ | ||||
x.user.username for x in inherited_perms] | ||||
assert 'repository.write' in [ | ||||
x.permission.permission_name for x in inherited_perms] | ||||
RepoModel().delete(repo_name_full) | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
@pytest.mark.xfail_backends( | ||||
"git", "hg", reason="Missing reposerver support") | ||||
def test_create_with_clone_uri(self, autologin_user, backend, reposerver, | ||||
csrf_token): | ||||
source_repo = backend.create_repo(number_of_commits=2) | ||||
source_repo_name = source_repo.repo_name | ||||
reposerver.serve(source_repo.scm_instance()) | ||||
repo_name = backend.new_repo_name() | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description='', | ||||
clone_uri=reposerver.url, | ||||
csrf_token=csrf_token), | ||||
status=302) | ||||
# Should be redirected to the creating page | ||||
response.mustcontain('repo_creating') | ||||
# Expecting that both repositories have same history | ||||
source_repo = RepoModel().get_by_repo_name(source_repo_name) | ||||
source_vcs = source_repo.scm_instance() | ||||
repo = RepoModel().get_by_repo_name(repo_name) | ||||
repo_vcs = repo.scm_instance() | ||||
assert source_vcs[0].message == repo_vcs[0].message | ||||
assert source_vcs.count() == repo_vcs.count() | ||||
assert source_vcs.commit_ids == repo_vcs.commit_ids | ||||
@pytest.mark.xfail_backends("svn", reason="Depends on import support") | ||||
def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend, | ||||
csrf_token): | ||||
repo_name = backend.new_repo_name() | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
clone_uri='http://repo.invalid/repo', | ||||
csrf_token=csrf_token)) | ||||
response.mustcontain('invalid clone url') | ||||
@pytest.mark.xfail_backends("svn", reason="Depends on import support") | ||||
def test_create_remote_repo_wrong_clone_uri_hg_svn( | ||||
self, autologin_user, backend, csrf_token): | ||||
repo_name = backend.new_repo_name() | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
clone_uri='svn+http://svn.invalid/repo', | ||||
csrf_token=csrf_token)) | ||||
response.mustcontain('invalid clone url') | ||||
def test_create_with_git_suffix( | ||||
self, autologin_user, backend, csrf_token): | ||||
repo_name = backend.new_repo_name() + ".git" | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
csrf_token=csrf_token)) | ||||
response.mustcontain('Repository name cannot end with .git') | ||||
def test_default_user_cannot_access_private_repo_in_a_group( | ||||
self, autologin_user, user_util, backend): | ||||
group = user_util.create_repo_group() | ||||
repo = backend.create_repo( | ||||
repo_private=True, repo_group=group, repo_copy_permissions=True) | ||||
permissions = _get_permission_for_user( | ||||
user='default', repo=repo.repo_name) | ||||
assert len(permissions) == 1 | ||||
assert permissions[0].permission.permission_name == 'repository.none' | ||||
assert permissions[0].repository.private is True | ||||
def test_create_on_top_level_without_permissions(self, backend): | ||||
session = login_user_session( | ||||
self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) | ||||
csrf_token = auth.get_csrf_token(session) | ||||
# revoke | ||||
user_model = UserModel() | ||||
# disable fork and create on default user | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') | ||||
# disable on regular user | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') | ||||
Session().commit() | ||||
repo_name = backend.new_repo_name() | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
csrf_token=csrf_token)) | ||||
response.mustcontain( | ||||
u"You do not have the permission to store repositories in " | ||||
u"the root location.") | ||||
@mock.patch.object(RepoModel, '_create_filesystem_repo', error_function) | ||||
def test_create_repo_when_filesystem_op_fails( | ||||
self, autologin_user, backend, csrf_token): | ||||
repo_name = backend.new_repo_name() | ||||
description = 'description for newly created repo' | ||||
response = self.app.post( | ||||
route_path('repo_create'), | ||||
fixture._get_repo_create_params( | ||||
repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=backend.alias, | ||||
repo_description=description, | ||||
csrf_token=csrf_token)) | ||||
assert_session_flash( | ||||
response, 'Error creating repository %s' % repo_name) | ||||
# repo must not be in db | ||||
assert backend.repo is None | ||||
# repo must not be in filesystem ! | ||||
assert not repo_on_filesystem(repo_name) | ||||
r5087 | def assert_repository_is_created_correctly(self, repo_name, description, backend): | |||
url_quoted_repo_name = urllib.parse.quote(repo_name) | ||||
r2014 | ||||
# run the check page that triggers the flash message | ||||
response = self.app.get( | ||||
r5087 | route_path('repo_creating_check', repo_name=repo_name)) | |||
assert response.json == {'result': True} | ||||
r2014 | ||||
r5087 | flash_msg = 'Created repository <a href="/{}">{}</a>'.format(url_quoted_repo_name, repo_name) | |||
r2014 | assert_session_flash(response, flash_msg) | |||
# test if the repo was created in the database | ||||
new_repo = RepoModel().get_by_repo_name(repo_name) | ||||
assert new_repo.repo_name == repo_name | ||||
assert new_repo.description == description | ||||
# test if the repository is visible in the list ? | ||||
response = self.app.get( | ||||
r5087 | h.route_path('repo_summary', repo_name=repo_name)) | |||
r2014 | response.mustcontain(repo_name) | |||
response.mustcontain(backend.alias) | ||||
assert repo_on_filesystem(repo_name) | ||||