|
|
|
|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import mock
|
|
|
import pytest
|
|
|
from rhodecode.model.db import User, UserIpMap
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.model.permission import PermissionModel
|
|
|
from rhodecode.model.ssh_key import SshKeyModel
|
|
|
from rhodecode.tests import (
|
|
|
TestController, clear_cache_regions, assert_session_flash)
|
|
|
|
|
|
|
|
|
def route_path(name, params=None, **kwargs):
|
|
|
import urllib.request
|
|
|
import urllib.parse
|
|
|
import urllib.error
|
|
|
from rhodecode.apps._base import ADMIN_PREFIX
|
|
|
|
|
|
base_url = {
|
|
|
'edit_user_ips':
|
|
|
ADMIN_PREFIX + '/users/{user_id}/edit/ips',
|
|
|
'edit_user_ips_add':
|
|
|
ADMIN_PREFIX + '/users/{user_id}/edit/ips/new',
|
|
|
'edit_user_ips_delete':
|
|
|
ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete',
|
|
|
|
|
|
'admin_permissions_application':
|
|
|
ADMIN_PREFIX + '/permissions/application',
|
|
|
'admin_permissions_application_update':
|
|
|
ADMIN_PREFIX + '/permissions/application/update',
|
|
|
|
|
|
'admin_permissions_global':
|
|
|
ADMIN_PREFIX + '/permissions/global',
|
|
|
'admin_permissions_global_update':
|
|
|
ADMIN_PREFIX + '/permissions/global/update',
|
|
|
|
|
|
'admin_permissions_object':
|
|
|
ADMIN_PREFIX + '/permissions/object',
|
|
|
'admin_permissions_object_update':
|
|
|
ADMIN_PREFIX + '/permissions/object/update',
|
|
|
|
|
|
'admin_permissions_ips':
|
|
|
ADMIN_PREFIX + '/permissions/ips',
|
|
|
'admin_permissions_overview':
|
|
|
ADMIN_PREFIX + '/permissions/overview',
|
|
|
|
|
|
'admin_permissions_ssh_keys':
|
|
|
ADMIN_PREFIX + '/permissions/ssh_keys',
|
|
|
'admin_permissions_ssh_keys_data':
|
|
|
ADMIN_PREFIX + '/permissions/ssh_keys/data',
|
|
|
'admin_permissions_ssh_keys_update':
|
|
|
ADMIN_PREFIX + '/permissions/ssh_keys/update'
|
|
|
|
|
|
}[name].format(**kwargs)
|
|
|
|
|
|
if params:
|
|
|
base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
|
|
|
return base_url
|
|
|
|
|
|
|
|
|
class TestAdminPermissionsController(TestController):
|
|
|
|
|
|
@pytest.fixture(scope='class', autouse=True)
|
|
|
def prepare(self, request):
|
|
|
# cleanup and reset to default permissions after
|
|
|
@request.addfinalizer
|
|
|
def cleanup():
|
|
|
PermissionModel().create_default_user_permissions(
|
|
|
User.get_default_user(), force=True)
|
|
|
|
|
|
def test_index_application(self):
|
|
|
self.log_user()
|
|
|
self.app.get(route_path('admin_permissions_application'))
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
'anonymous, default_register, default_register_message, default_password_reset,'
|
|
|
'default_extern_activate, expect_error, expect_form_error', [
|
|
|
(True, 'hg.register.none', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
|
|
|
False, False),
|
|
|
(True, 'hg.register.manual_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.auto',
|
|
|
False, False),
|
|
|
(True, 'hg.register.auto_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
|
|
|
False, False),
|
|
|
(True, 'hg.register.auto_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
|
|
|
False, False),
|
|
|
(True, 'hg.register.XXX', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
|
|
|
False, True),
|
|
|
(True, '', '', 'hg.password_reset.enabled', '', True, False),
|
|
|
])
|
|
|
def test_update_application_permissions(
|
|
|
self, anonymous, default_register, default_register_message, default_password_reset,
|
|
|
default_extern_activate, expect_error, expect_form_error):
|
|
|
|
|
|
self.log_user()
|
|
|
|
|
|
# TODO: anonymous access set here to False, breaks some other tests
|
|
|
params = {
|
|
|
'csrf_token': self.csrf_token,
|
|
|
'anonymous': anonymous,
|
|
|
'default_register': default_register,
|
|
|
'default_register_message': default_register_message,
|
|
|
'default_password_reset': default_password_reset,
|
|
|
'default_extern_activate': default_extern_activate,
|
|
|
}
|
|
|
response = self.app.post(route_path('admin_permissions_application_update'),
|
|
|
params=params)
|
|
|
if expect_form_error:
|
|
|
assert response.status_int == 200
|
|
|
response.mustcontain('Value must be one of')
|
|
|
else:
|
|
|
if expect_error:
|
|
|
msg = 'Error occurred during update of permissions'
|
|
|
else:
|
|
|
msg = 'Application permissions updated successfully'
|
|
|
assert_session_flash(response, msg)
|
|
|
|
|
|
def test_index_object(self):
|
|
|
self.log_user()
|
|
|
self.app.get(route_path('admin_permissions_object'))
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
'repo, repo_group, user_group, expect_error, expect_form_error', [
|
|
|
('repository.none', 'group.none', 'usergroup.none', False, False),
|
|
|
('repository.read', 'group.read', 'usergroup.read', False, False),
|
|
|
('repository.write', 'group.write', 'usergroup.write',
|
|
|
False, False),
|
|
|
('repository.admin', 'group.admin', 'usergroup.admin',
|
|
|
False, False),
|
|
|
('repository.XXX', 'group.admin', 'usergroup.admin', False, True),
|
|
|
('', '', '', True, False),
|
|
|
])
|
|
|
def test_update_object_permissions(self, repo, repo_group, user_group,
|
|
|
expect_error, expect_form_error):
|
|
|
self.log_user()
|
|
|
|
|
|
params = {
|
|
|
'csrf_token': self.csrf_token,
|
|
|
'default_repo_perm': repo,
|
|
|
'overwrite_default_repo': False,
|
|
|
'default_group_perm': repo_group,
|
|
|
'overwrite_default_group': False,
|
|
|
'default_user_group_perm': user_group,
|
|
|
'overwrite_default_user_group': False,
|
|
|
}
|
|
|
response = self.app.post(route_path('admin_permissions_object_update'),
|
|
|
params=params)
|
|
|
if expect_form_error:
|
|
|
assert response.status_int == 200
|
|
|
response.mustcontain('Value must be one of')
|
|
|
else:
|
|
|
if expect_error:
|
|
|
msg = 'Error occurred during update of permissions'
|
|
|
else:
|
|
|
msg = 'Object permissions updated successfully'
|
|
|
assert_session_flash(response, msg)
|
|
|
|
|
|
def test_index_global(self):
|
|
|
self.log_user()
|
|
|
self.app.get(route_path('admin_permissions_global'))
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
'repo_create, repo_create_write, user_group_create, repo_group_create,'
|
|
|
'fork_create, inherit_default_permissions, expect_error,'
|
|
|
'expect_form_error', [
|
|
|
('hg.create.none', 'hg.create.write_on_repogroup.false',
|
|
|
'hg.usergroup.create.false', 'hg.repogroup.create.false',
|
|
|
'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
|
|
|
('hg.create.repository', 'hg.create.write_on_repogroup.true',
|
|
|
'hg.usergroup.create.true', 'hg.repogroup.create.true',
|
|
|
'hg.fork.repository', 'hg.inherit_default_perms.false',
|
|
|
False, False),
|
|
|
('hg.create.XXX', 'hg.create.write_on_repogroup.true',
|
|
|
'hg.usergroup.create.true', 'hg.repogroup.create.true',
|
|
|
'hg.fork.repository', 'hg.inherit_default_perms.false',
|
|
|
False, True),
|
|
|
('', '', '', '', '', '', True, False),
|
|
|
])
|
|
|
def test_update_global_permissions(
|
|
|
self, repo_create, repo_create_write, user_group_create,
|
|
|
repo_group_create, fork_create, inherit_default_permissions,
|
|
|
expect_error, expect_form_error):
|
|
|
self.log_user()
|
|
|
|
|
|
params = {
|
|
|
'csrf_token': self.csrf_token,
|
|
|
'default_repo_create': repo_create,
|
|
|
'default_repo_create_on_write': repo_create_write,
|
|
|
'default_user_group_create': user_group_create,
|
|
|
'default_repo_group_create': repo_group_create,
|
|
|
'default_fork_create': fork_create,
|
|
|
'default_inherit_default_permissions': inherit_default_permissions
|
|
|
}
|
|
|
response = self.app.post(route_path('admin_permissions_global_update'),
|
|
|
params=params)
|
|
|
if expect_form_error:
|
|
|
assert response.status_int == 200
|
|
|
response.mustcontain('Value must be one of')
|
|
|
else:
|
|
|
if expect_error:
|
|
|
msg = 'Error occurred during update of permissions'
|
|
|
else:
|
|
|
msg = 'Global permissions updated successfully'
|
|
|
assert_session_flash(response, msg)
|
|
|
|
|
|
def test_index_ips(self):
|
|
|
self.log_user()
|
|
|
response = self.app.get(route_path('admin_permissions_ips'))
|
|
|
response.mustcontain('All IP addresses are allowed')
|
|
|
|
|
|
def test_add_delete_ips(self):
|
|
|
clear_cache_regions(['sql_cache_short'])
|
|
|
self.log_user()
|
|
|
|
|
|
# ADD
|
|
|
default_user_id = User.get_default_user_id()
|
|
|
self.app.post(
|
|
|
route_path('edit_user_ips_add', user_id=default_user_id),
|
|
|
params={'new_ip': '0.0.0.0/24', 'csrf_token': self.csrf_token})
|
|
|
|
|
|
response = self.app.get(route_path('admin_permissions_ips'))
|
|
|
response.mustcontain('0.0.0.0/24')
|
|
|
response.mustcontain('0.0.0.0 - 0.0.0.255')
|
|
|
|
|
|
# DELETE
|
|
|
default_user_id = User.get_default_user_id()
|
|
|
del_ip_id = UserIpMap.query().filter(UserIpMap.user_id ==
|
|
|
default_user_id).first().ip_id
|
|
|
|
|
|
response = self.app.post(
|
|
|
route_path('edit_user_ips_delete', user_id=default_user_id),
|
|
|
params={'del_ip_id': del_ip_id, 'csrf_token': self.csrf_token})
|
|
|
|
|
|
assert_session_flash(response, 'Removed ip address from user whitelist')
|
|
|
|
|
|
clear_cache_regions(['sql_cache_short'])
|
|
|
response = self.app.get(route_path('admin_permissions_ips'))
|
|
|
response.mustcontain('All IP addresses are allowed')
|
|
|
response.mustcontain(no=['0.0.0.0/24'])
|
|
|
response.mustcontain(no=['0.0.0.0 - 0.0.0.255'])
|
|
|
|
|
|
def test_index_overview(self):
|
|
|
self.log_user()
|
|
|
self.app.get(route_path('admin_permissions_overview'))
|
|
|
|
|
|
def test_ssh_keys(self):
|
|
|
self.log_user()
|
|
|
self.app.get(route_path('admin_permissions_ssh_keys'), status=200)
|
|
|
|
|
|
def test_ssh_keys_data(self, user_util, xhr_header):
|
|
|
self.log_user()
|
|
|
response = self.app.get(route_path('admin_permissions_ssh_keys_data'),
|
|
|
extra_environ=xhr_header)
|
|
|
assert response.json == {u'data': [], u'draw': None,
|
|
|
u'recordsFiltered': 0, u'recordsTotal': 0}
|
|
|
|
|
|
dummy_user = user_util.create_user()
|
|
|
SshKeyModel().create(dummy_user, 'ab:cd:ef', 'KEYKEY', 'test_key')
|
|
|
Session().commit()
|
|
|
response = self.app.get(route_path('admin_permissions_ssh_keys_data'),
|
|
|
extra_environ=xhr_header)
|
|
|
assert response.json['data'][0]['fingerprint'] == 'ab:cd:ef'
|
|
|
|
|
|
def test_ssh_keys_update(self):
|
|
|
self.log_user()
|
|
|
response = self.app.post(
|
|
|
route_path('admin_permissions_ssh_keys_update'),
|
|
|
dict(csrf_token=self.csrf_token), status=302)
|
|
|
|
|
|
assert_session_flash(
|
|
|
response, 'Updated SSH keys file')
|
|
|
|
|
|
def test_ssh_keys_update_disabled(self):
|
|
|
self.log_user()
|
|
|
|
|
|
from rhodecode.apps.admin.views.permissions import AdminPermissionsView
|
|
|
with mock.patch.object(AdminPermissionsView, 'ssh_enabled',
|
|
|
return_value=False):
|
|
|
response = self.app.post(
|
|
|
route_path('admin_permissions_ssh_keys_update'),
|
|
|
dict(csrf_token=self.csrf_token), status=302)
|
|
|
|
|
|
assert_session_flash(
|
|
|
response, 'SSH key support is disabled in .ini file')
|