|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
from rhodecode.tests import (
|
|
|
TestController, assert_session_flash, TEST_USER_ADMIN_LOGIN)
|
|
|
from rhodecode.model.db import UserGroup
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.tests.fixture import Fixture
|
|
|
|
|
|
fixture = Fixture()
|
|
|
|
|
|
|
|
|
def route_path(name, params=None, **kwargs):
|
|
|
import urllib.request
|
|
|
import urllib.parse
|
|
|
import urllib.error
|
|
|
from rhodecode.apps._base import ADMIN_PREFIX
|
|
|
|
|
|
base_url = {
|
|
|
'user_groups': ADMIN_PREFIX + '/user_groups',
|
|
|
'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
|
|
|
'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
|
|
|
'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
|
|
|
'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
|
|
|
'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
|
|
|
'edit_user_group_advanced_sync': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/advanced/sync',
|
|
|
'edit_user_group_global_perms_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/global_permissions/update',
|
|
|
'user_groups_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/update',
|
|
|
'user_groups_delete': ADMIN_PREFIX + '/user_groups/{user_group_id}/delete',
|
|
|
|
|
|
}[name].format(**kwargs)
|
|
|
|
|
|
if params:
|
|
|
base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
|
|
|
return base_url
|
|
|
|
|
|
|
|
|
class TestUserGroupsView(TestController):
|
|
|
|
|
|
def test_set_synchronization(self, user_util):
|
|
|
self.log_user()
|
|
|
user_group_name = user_util.create_user_group().users_group_name
|
|
|
|
|
|
group = Session().query(UserGroup).filter(
|
|
|
UserGroup.users_group_name == user_group_name).one()
|
|
|
|
|
|
assert group.group_data.get('extern_type') is None
|
|
|
|
|
|
# enable
|
|
|
self.app.post(
|
|
|
route_path('edit_user_group_advanced_sync',
|
|
|
user_group_id=group.users_group_id),
|
|
|
params={'csrf_token': self.csrf_token}, status=302)
|
|
|
|
|
|
group = Session().query(UserGroup).filter(
|
|
|
UserGroup.users_group_name == user_group_name).one()
|
|
|
assert group.group_data.get('extern_type') == 'manual'
|
|
|
assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
|
|
|
|
|
|
# disable
|
|
|
self.app.post(
|
|
|
route_path('edit_user_group_advanced_sync',
|
|
|
user_group_id=group.users_group_id),
|
|
|
params={'csrf_token': self.csrf_token}, status=302)
|
|
|
|
|
|
group = Session().query(UserGroup).filter(
|
|
|
UserGroup.users_group_name == user_group_name).one()
|
|
|
assert group.group_data.get('extern_type') is None
|
|
|
assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
|
|
|
|
|
|
def test_delete_user_group(self, user_util):
|
|
|
self.log_user()
|
|
|
user_group_id = user_util.create_user_group().users_group_id
|
|
|
|
|
|
group = Session().query(UserGroup).filter(
|
|
|
UserGroup.users_group_id == user_group_id).one()
|
|
|
|
|
|
self.app.post(
|
|
|
route_path('user_groups_delete', user_group_id=group.users_group_id),
|
|
|
params={'csrf_token': self.csrf_token})
|
|
|
|
|
|
group = Session().query(UserGroup).filter(
|
|
|
UserGroup.users_group_id == user_group_id).scalar()
|
|
|
|
|
|
assert group is None
|
|
|
|
|
|
@pytest.mark.parametrize('repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, inherit_default_permissions, expect_error, expect_form_error', [
|
|
|
('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
|
|
|
('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False),
|
|
|
('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True),
|
|
|
('', '', '', '', '', '', True, False),
|
|
|
])
|
|
|
def test_global_permissions_on_user_group(
|
|
|
self, repo_create, repo_create_write, user_group_create,
|
|
|
repo_group_create, fork_create, expect_error, expect_form_error,
|
|
|
inherit_default_permissions, user_util):
|
|
|
|
|
|
self.log_user()
|
|
|
user_group = user_util.create_user_group()
|
|
|
|
|
|
user_group_name = user_group.users_group_name
|
|
|
user_group_id = user_group.users_group_id
|
|
|
|
|
|
# ENABLE REPO CREATE ON A GROUP
|
|
|
perm_params = {
|
|
|
'inherit_default_permissions': False,
|
|
|
'default_repo_create': repo_create,
|
|
|
'default_repo_create_on_write': repo_create_write,
|
|
|
'default_user_group_create': user_group_create,
|
|
|
'default_repo_group_create': repo_group_create,
|
|
|
'default_fork_create': fork_create,
|
|
|
'default_inherit_default_permissions': inherit_default_permissions,
|
|
|
|
|
|
'csrf_token': self.csrf_token,
|
|
|
}
|
|
|
response = self.app.post(
|
|
|
route_path('edit_user_group_global_perms_update',
|
|
|
user_group_id=user_group_id),
|
|
|
params=perm_params)
|
|
|
|
|
|
if expect_form_error:
|
|
|
assert response.status_int == 200
|
|
|
response.mustcontain('Value must be one of')
|
|
|
else:
|
|
|
if expect_error:
|
|
|
msg = 'An error occurred during permissions saving'
|
|
|
else:
|
|
|
msg = 'User Group global permissions updated successfully'
|
|
|
ug = UserGroup.get_by_group_name(user_group_name)
|
|
|
del perm_params['csrf_token']
|
|
|
del perm_params['inherit_default_permissions']
|
|
|
assert perm_params == ug.get_default_perms()
|
|
|
assert_session_flash(response, msg)
|
|
|
|
|
|
def test_edit_view(self, user_util):
|
|
|
self.log_user()
|
|
|
|
|
|
user_group = user_util.create_user_group()
|
|
|
self.app.get(
|
|
|
route_path('edit_user_group',
|
|
|
user_group_id=user_group.users_group_id),
|
|
|
status=200)
|
|
|
|
|
|
def test_update_user_group(self, user_util):
|
|
|
user = self.log_user()
|
|
|
|
|
|
user_group = user_util.create_user_group()
|
|
|
users_group_id = user_group.users_group_id
|
|
|
new_name = user_group.users_group_name + '_CHANGE'
|
|
|
|
|
|
params = [
|
|
|
('users_group_active', False),
|
|
|
('user_group_description', 'DESC'),
|
|
|
('users_group_name', new_name),
|
|
|
('user', user['username']),
|
|
|
('csrf_token', self.csrf_token),
|
|
|
('__start__', 'user_group_members:sequence'),
|
|
|
('__start__', 'member:mapping'),
|
|
|
('member_user_id', user['user_id']),
|
|
|
('type', 'existing'),
|
|
|
('__end__', 'member:mapping'),
|
|
|
('__end__', 'user_group_members:sequence'),
|
|
|
]
|
|
|
|
|
|
self.app.post(
|
|
|
route_path('user_groups_update',
|
|
|
user_group_id=users_group_id),
|
|
|
params=params,
|
|
|
status=302)
|
|
|
|
|
|
user_group = UserGroup.get(users_group_id)
|
|
|
assert user_group
|
|
|
|
|
|
assert user_group.users_group_name == new_name
|
|
|
assert user_group.user_group_description == 'DESC'
|
|
|
assert user_group.users_group_active is False
|
|
|
|
|
|
def test_update_user_group_name_conflicts(self, user_util):
|
|
|
self.log_user()
|
|
|
user_group_old = user_util.create_user_group()
|
|
|
new_name = user_group_old.users_group_name
|
|
|
|
|
|
user_group = user_util.create_user_group()
|
|
|
|
|
|
params = dict(
|
|
|
users_group_active=False,
|
|
|
user_group_description='DESC',
|
|
|
users_group_name=new_name,
|
|
|
csrf_token=self.csrf_token)
|
|
|
|
|
|
response = self.app.post(
|
|
|
route_path('user_groups_update',
|
|
|
user_group_id=user_group.users_group_id),
|
|
|
params=params,
|
|
|
status=200)
|
|
|
|
|
|
response.mustcontain('User group `{}` already exists'.format(
|
|
|
new_name))
|
|
|
|
|
|
def test_update_members_from_user_ids(self, user_regular):
|
|
|
uid = user_regular.user_id
|
|
|
username = user_regular.username
|
|
|
self.log_user()
|
|
|
|
|
|
user_group = fixture.create_user_group('test_gr_ids')
|
|
|
assert user_group.members == []
|
|
|
assert user_group.user != user_regular
|
|
|
expected_active_state = not user_group.users_group_active
|
|
|
|
|
|
form_data = [
|
|
|
('csrf_token', self.csrf_token),
|
|
|
('user', username),
|
|
|
('users_group_name', 'changed_name'),
|
|
|
('users_group_active', expected_active_state),
|
|
|
('user_group_description', 'changed_description'),
|
|
|
|
|
|
('__start__', 'user_group_members:sequence'),
|
|
|
('__start__', 'member:mapping'),
|
|
|
('member_user_id', uid),
|
|
|
('type', 'existing'),
|
|
|
('__end__', 'member:mapping'),
|
|
|
('__end__', 'user_group_members:sequence'),
|
|
|
]
|
|
|
ugid = user_group.users_group_id
|
|
|
self.app.post(
|
|
|
route_path('user_groups_update', user_group_id=ugid), form_data)
|
|
|
|
|
|
user_group = UserGroup.get(ugid)
|
|
|
assert user_group
|
|
|
|
|
|
assert user_group.members[0].user_id == uid
|
|
|
assert user_group.user_id == uid
|
|
|
assert 'changed_name' in user_group.users_group_name
|
|
|
assert 'changed_description' in user_group.user_group_description
|
|
|
assert user_group.users_group_active == expected_active_state
|
|
|
|
|
|
fixture.destroy_user_group(user_group)
|
|
|
|