# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import mock import pytest from rhodecode.model.db import User, UserIpMap from rhodecode.model.meta import Session from rhodecode.model.permission import PermissionModel from rhodecode.model.ssh_key import SshKeyModel from rhodecode.tests import ( TestController, clear_cache_regions, assert_session_flash) def route_path(name, params=None, **kwargs): import urllib.request import urllib.parse import urllib.error from rhodecode.apps._base import ADMIN_PREFIX base_url = { 'edit_user_ips': ADMIN_PREFIX + '/users/{user_id}/edit/ips', 'edit_user_ips_add': ADMIN_PREFIX + '/users/{user_id}/edit/ips/new', 'edit_user_ips_delete': ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete', 'admin_permissions_application': ADMIN_PREFIX + '/permissions/application', 'admin_permissions_application_update': ADMIN_PREFIX + '/permissions/application/update', 'admin_permissions_global': ADMIN_PREFIX + '/permissions/global', 'admin_permissions_global_update': ADMIN_PREFIX + '/permissions/global/update', 'admin_permissions_object': ADMIN_PREFIX + '/permissions/object', 'admin_permissions_object_update': ADMIN_PREFIX + '/permissions/object/update', 'admin_permissions_ips': ADMIN_PREFIX + '/permissions/ips', 'admin_permissions_overview': ADMIN_PREFIX + '/permissions/overview', 'admin_permissions_ssh_keys': ADMIN_PREFIX + '/permissions/ssh_keys', 'admin_permissions_ssh_keys_data': ADMIN_PREFIX + '/permissions/ssh_keys/data', 'admin_permissions_ssh_keys_update': ADMIN_PREFIX + '/permissions/ssh_keys/update' }[name].format(**kwargs) if params: base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params)) return base_url class TestAdminPermissionsController(TestController): @pytest.fixture(scope='class', autouse=True) def prepare(self, request): # cleanup and reset to default permissions after @request.addfinalizer def cleanup(): PermissionModel().create_default_user_permissions( User.get_default_user(), force=True) def test_index_application(self): self.log_user() self.app.get(route_path('admin_permissions_application')) @pytest.mark.parametrize( 'anonymous, default_register, default_register_message, default_password_reset,' 'default_extern_activate, expect_error, expect_form_error', [ (True, 'hg.register.none', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual', False, False), (True, 'hg.register.manual_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.auto', False, False), (True, 'hg.register.auto_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual', False, False), (True, 'hg.register.auto_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual', False, False), (True, 'hg.register.XXX', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual', False, True), (True, '', '', 'hg.password_reset.enabled', '', True, False), ]) def test_update_application_permissions( self, anonymous, default_register, default_register_message, default_password_reset, default_extern_activate, expect_error, expect_form_error): self.log_user() # TODO: anonymous access set here to False, breaks some other tests params = { 'csrf_token': self.csrf_token, 'anonymous': anonymous, 'default_register': default_register, 'default_register_message': default_register_message, 'default_password_reset': default_password_reset, 'default_extern_activate': default_extern_activate, } response = self.app.post(route_path('admin_permissions_application_update'), params=params) if expect_form_error: assert response.status_int == 200 response.mustcontain('Value must be one of') else: if expect_error: msg = 'Error occurred during update of permissions' else: msg = 'Application permissions updated successfully' assert_session_flash(response, msg) def test_index_object(self): self.log_user() self.app.get(route_path('admin_permissions_object')) @pytest.mark.parametrize( 'repo, repo_group, user_group, expect_error, expect_form_error', [ ('repository.none', 'group.none', 'usergroup.none', False, False), ('repository.read', 'group.read', 'usergroup.read', False, False), ('repository.write', 'group.write', 'usergroup.write', False, False), ('repository.admin', 'group.admin', 'usergroup.admin', False, False), ('repository.XXX', 'group.admin', 'usergroup.admin', False, True), ('', '', '', True, False), ]) def test_update_object_permissions(self, repo, repo_group, user_group, expect_error, expect_form_error): self.log_user() params = { 'csrf_token': self.csrf_token, 'default_repo_perm': repo, 'overwrite_default_repo': False, 'default_group_perm': repo_group, 'overwrite_default_group': False, 'default_user_group_perm': user_group, 'overwrite_default_user_group': False, } response = self.app.post(route_path('admin_permissions_object_update'), params=params) if expect_form_error: assert response.status_int == 200 response.mustcontain('Value must be one of') else: if expect_error: msg = 'Error occurred during update of permissions' else: msg = 'Object permissions updated successfully' assert_session_flash(response, msg) def test_index_global(self): self.log_user() self.app.get(route_path('admin_permissions_global')) @pytest.mark.parametrize( 'repo_create, repo_create_write, user_group_create, repo_group_create,' 'fork_create, inherit_default_permissions, expect_error,' 'expect_form_error', [ ('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False), ('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False), ('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True), ('', '', '', '', '', '', True, False), ]) def test_update_global_permissions( self, repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, inherit_default_permissions, expect_error, expect_form_error): self.log_user() params = { 'csrf_token': self.csrf_token, 'default_repo_create': repo_create, 'default_repo_create_on_write': repo_create_write, 'default_user_group_create': user_group_create, 'default_repo_group_create': repo_group_create, 'default_fork_create': fork_create, 'default_inherit_default_permissions': inherit_default_permissions } response = self.app.post(route_path('admin_permissions_global_update'), params=params) if expect_form_error: assert response.status_int == 200 response.mustcontain('Value must be one of') else: if expect_error: msg = 'Error occurred during update of permissions' else: msg = 'Global permissions updated successfully' assert_session_flash(response, msg) def test_index_ips(self): self.log_user() response = self.app.get(route_path('admin_permissions_ips')) response.mustcontain('All IP addresses are allowed') def test_add_delete_ips(self): clear_cache_regions(['sql_cache_short']) self.log_user() # ADD default_user_id = User.get_default_user_id() self.app.post( route_path('edit_user_ips_add', user_id=default_user_id), params={'new_ip': '0.0.0.0/24', 'csrf_token': self.csrf_token}) response = self.app.get(route_path('admin_permissions_ips')) response.mustcontain('0.0.0.0/24') response.mustcontain('0.0.0.0 - 0.0.0.255') # DELETE default_user_id = User.get_default_user_id() del_ip_id = UserIpMap.query().filter(UserIpMap.user_id == default_user_id).first().ip_id response = self.app.post( route_path('edit_user_ips_delete', user_id=default_user_id), params={'del_ip_id': del_ip_id, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Removed ip address from user whitelist') clear_cache_regions(['sql_cache_short']) response = self.app.get(route_path('admin_permissions_ips')) response.mustcontain('All IP addresses are allowed') response.mustcontain(no=['0.0.0.0/24']) response.mustcontain(no=['0.0.0.0 - 0.0.0.255']) def test_index_overview(self): self.log_user() self.app.get(route_path('admin_permissions_overview')) def test_ssh_keys(self): self.log_user() self.app.get(route_path('admin_permissions_ssh_keys'), status=200) def test_ssh_keys_data(self, user_util, xhr_header): self.log_user() response = self.app.get(route_path('admin_permissions_ssh_keys_data'), extra_environ=xhr_header) assert response.json == {u'data': [], u'draw': None, u'recordsFiltered': 0, u'recordsTotal': 0} dummy_user = user_util.create_user() SshKeyModel().create(dummy_user, 'ab:cd:ef', 'KEYKEY', 'test_key') Session().commit() response = self.app.get(route_path('admin_permissions_ssh_keys_data'), extra_environ=xhr_header) assert response.json['data'][0]['fingerprint'] == 'ab:cd:ef' def test_ssh_keys_update(self): self.log_user() response = self.app.post( route_path('admin_permissions_ssh_keys_update'), dict(csrf_token=self.csrf_token), status=302) assert_session_flash( response, 'Updated SSH keys file') def test_ssh_keys_update_disabled(self): self.log_user() from rhodecode.apps.admin.views.permissions import AdminPermissionsView with mock.patch.object(AdminPermissionsView, 'ssh_enabled', return_value=False): response = self.app.post( route_path('admin_permissions_ssh_keys_update'), dict(csrf_token=self.csrf_token), status=302) assert_session_flash( response, 'SSH key support is disabled in .ini file')