# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import pytest from mock import patch from rhodecode.lib import auth from rhodecode.lib.str_utils import safe_bytes from rhodecode.lib.hash_utils import md5_safe, sha1 from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.db import Session, User from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel from rhodecode.model.user_group import UserGroupModel def repickle(obj): import pickle return pickle.loads(pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)) def test_perm_origin_dict(): pod = auth.PermOriginDict() pod['thing'] = 'read', 'default', 1 assert pod['thing'] == 'read' assert pod.perm_origin_stack == { 'thing': [('read', 'default', 1)]} pod['thing'] = 'write', 'admin', 1 assert pod['thing'] == 'write' assert pod.perm_origin_stack == { 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod['other'] = 'write', 'default', 8 assert pod.perm_origin_stack == { 'other': [('write', 'default', 8)], 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod['other'] = 'none', 'override', 8 assert pod.perm_origin_stack == { 'other': [('write', 'default', 8), ('none', 'override', 8)], 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} # we can still save regular key pod['thing'] = 'read' with pytest.raises(ValueError): pod['thing'] = 'read', 'missing-3td-key' def test_perm_origin_dict_serialization(): pod = auth.PermOriginDict() pod['thing'] = 'read', 'default', 1 assert pod['thing'] == 'read' pod = repickle(pod) assert pod['thing'] == 'read' assert pod.perm_origin_stack == { 'thing': [('read', 'default', 1)]} pod['thing'] = 'write', 'admin', 1 assert pod['thing'] == 'write' assert pod['thing'] == 'write' assert pod.perm_origin_stack == { 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod = repickle(pod) assert pod['thing'] == 'write' assert pod['thing'] == 'write' assert pod.perm_origin_stack == { 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod['other'] = 'write', 'default', 8 assert pod.perm_origin_stack == { 'other': [('write', 'default', 8)], 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod = repickle(pod) assert pod.perm_origin_stack == { 'other': [('write', 'default', 8)], 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod['other'] = 'none', 'override', 8 assert pod.perm_origin_stack == { 'other': [('write', 'default', 8), ('none', 'override', 8)], 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod = repickle(pod) assert pod.perm_origin_stack == { 'other': [('write', 'default', 8), ('none', 'override', 8)], 'thing': [('read', 'default', 1), ('write', 'admin', 1)]} pod['thing'] = 'read' with pytest.raises(ValueError): pod['thing'] = 'read', 'missing-3td-key' def test_branch_perm_origin_dict(): pod = auth.BranchPermOriginDict() pod['resource'] = {'*pattern': 'read'}, 'default' assert pod['resource'] == {'*pattern': 'read'} assert pod.perm_origin_stack == {'resource': {'*pattern': [('read', 'default')]}} # 2nd call pod['resource'] = {'*pattern': 'write'}, 'admin' assert pod['resource'] == {'*pattern': 'write'} assert pod.perm_origin_stack == {'resource': {'*pattern': [('read', 'default'), ('write', 'admin')]}} def test_branch_perm_origin_dict_serialization(): pod = auth.BranchPermOriginDict() pod['resource'] = {'*pattern': 'read'}, 'default' assert pod['resource'] == {'*pattern': 'read'} assert pod.perm_origin_stack == {'resource': {'*pattern': [('read', 'default')]}} pod = repickle(pod) assert pod['resource'] == {'*pattern': 'read'} assert pod.perm_origin_stack == {'resource': {'*pattern': [('read', 'default')]}} # 2nd call pod['resource'] = {'*pattern': 'write'}, 'admin' assert pod['resource'] == {'*pattern': 'write'} assert pod.perm_origin_stack == {'resource': {'*pattern': [('read', 'default'), ('write', 'admin')]}} pod = repickle(pod) assert pod['resource'] == {'*pattern': 'write'} assert pod.perm_origin_stack == {'resource': {'*pattern': [('read', 'default'), ('write', 'admin')]}} def test_cached_perms_data(user_regular, backend_random): permissions = get_permissions(user_regular) repo_name = backend_random.repo.repo_name expected_global_permissions = { 'repository.read', 'group.read', 'usergroup.read'} assert expected_global_permissions.issubset(permissions['global']) assert permissions['repositories'][repo_name] == 'repository.read' def test_cached_perms_data_with_admin_user(user_regular, backend_random): permissions = get_permissions(user_regular, user_is_admin=True) repo_name = backend_random.repo.repo_name assert 'hg.admin' in permissions['global'] assert permissions['repositories'][repo_name] == 'repository.admin' def test_cached_perms_data_with_admin_user_extended_calculation(user_regular, backend_random): permissions = get_permissions(user_regular, user_is_admin=True, calculate_super_admin=True) repo_name = backend_random.repo.repo_name assert 'hg.admin' in permissions['global'] assert permissions['repositories'][repo_name] == 'repository.admin' def test_cached_perms_data_user_group_global_permissions(user_util): user, user_group = user_util.create_user_with_group() user_group.inherit_default_permissions = False granted_permission = 'repository.write' UserGroupModel().grant_perm(user_group, granted_permission) Session().commit() permissions = get_permissions(user) assert granted_permission in permissions['global'] @pytest.mark.xfail(reason="Not implemented, see TODO note") def test_cached_perms_data_user_group_global_permissions_(user_util): user, user_group = user_util.create_user_with_group() granted_permission = 'repository.write' UserGroupModel().grant_perm(user_group, granted_permission) Session().commit() permissions = get_permissions(user) assert granted_permission in permissions['global'] def test_cached_perms_data_user_global_permissions(user_util): user = user_util.create_user() UserModel().grant_perm(user, 'repository.none') Session().commit() permissions = get_permissions(user, user_inherit_default_permissions=True) assert 'repository.read' in permissions['global'] def test_cached_perms_data_repository_permissions_on_private_repository( backend_random, user_util): user, user_group = user_util.create_user_with_group() repo = backend_random.create_repo() repo.private = True granted_permission = 'repository.write' RepoModel().grant_user_group_permission( repo, user_group.users_group_name, granted_permission) Session().commit() permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == granted_permission def test_cached_perms_data_repository_permissions_for_owner( backend_random, user_util): user = user_util.create_user() repo = backend_random.create_repo() repo.user_id = user.user_id permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == 'repository.admin' # TODO: johbo: Make cleanup in UserUtility smarter, then remove this hack repo.user_id = User.get_default_user_id() def test_cached_perms_data_repository_permissions_not_inheriting_defaults( backend_random, user_util): user = user_util.create_user() repo = backend_random.create_repo() # Don't inherit default object permissions UserModel().grant_perm(user, 'hg.inherit_default_perms.false') Session().commit() permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == 'repository.none' def test_cached_perms_data_default_permissions_on_repository_group(user_util): # Have a repository group with default permissions set repo_group = user_util.create_repo_group() default_user = User.get_default_user() user_util.grant_user_permission_to_repo_group( repo_group, default_user, 'repository.write') user = user_util.create_user() permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'repository.write' def test_cached_perms_data_default_permissions_on_repository_group_owner( user_util): # Have a repository group repo_group = user_util.create_repo_group() default_user = User.get_default_user() # Add a permission for the default user to hit the code path user_util.grant_user_permission_to_repo_group( repo_group, default_user, 'repository.write') # Have an owner of the group user = user_util.create_user() repo_group.user_id = user.user_id permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.admin' def test_cached_perms_data_default_permissions_on_repository_group_no_inherit( user_util): # Have a repository group repo_group = user_util.create_repo_group() default_user = User.get_default_user() # Add a permission for the default user to hit the code path user_util.grant_user_permission_to_repo_group( repo_group, default_user, 'repository.write') # Don't inherit default object permissions user = user_util.create_user() UserModel().grant_perm(user, 'hg.inherit_default_perms.false') Session().commit() permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.none' def test_cached_perms_data_repository_permissions_from_user_group( user_util, backend_random): user, user_group = user_util.create_user_with_group() # Needs a second user group to make sure that we select the right # permissions. user_group2 = user_util.create_user_group() UserGroupModel().add_user_to_group(user_group2, user) repo = backend_random.create_repo() RepoModel().grant_user_group_permission( repo, user_group.users_group_name, 'repository.read') RepoModel().grant_user_group_permission( repo, user_group2.users_group_name, 'repository.write') Session().commit() permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == 'repository.write' def test_cached_perms_data_repository_permissions_from_user_group_owner( user_util, backend_random): user, user_group = user_util.create_user_with_group() repo = backend_random.create_repo() repo.user_id = user.user_id RepoModel().grant_user_group_permission( repo, user_group.users_group_name, 'repository.write') Session().commit() permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == 'repository.admin' def test_cached_perms_data_user_repository_permissions( user_util, backend_random): user = user_util.create_user() repo = backend_random.create_repo() granted_permission = 'repository.write' RepoModel().grant_user_permission(repo, user, granted_permission) Session().commit() permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == granted_permission def test_cached_perms_data_user_repository_permissions_explicit( user_util, backend_random): user = user_util.create_user() repo = backend_random.create_repo() granted_permission = 'repository.none' RepoModel().grant_user_permission(repo, user, granted_permission) Session().commit() permissions = get_permissions(user, explicit=True) assert permissions['repositories'][repo.repo_name] == granted_permission def test_cached_perms_data_user_repository_permissions_owner( user_util, backend_random): user = user_util.create_user() repo = backend_random.create_repo() repo.user_id = user.user_id RepoModel().grant_user_permission(repo, user, 'repository.write') Session().commit() permissions = get_permissions(user) assert permissions['repositories'][repo.repo_name] == 'repository.admin' def test_cached_perms_data_repository_groups_permissions_inherited( user_util, backend_random): user, user_group = user_util.create_user_with_group() # Needs a second group to hit the last condition user_group2 = user_util.create_user_group() UserGroupModel().add_user_to_group(user_group2, user) repo_group = user_util.create_repo_group() user_util.grant_user_group_permission_to_repo_group( repo_group, user_group, 'group.read') user_util.grant_user_group_permission_to_repo_group( repo_group, user_group2, 'group.write') permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.write' def test_cached_perms_data_repository_groups_permissions_inherited_owner( user_util, backend_random): user, user_group = user_util.create_user_with_group() repo_group = user_util.create_repo_group() repo_group.user_id = user.user_id granted_permission = 'group.write' user_util.grant_user_group_permission_to_repo_group( repo_group, user_group, granted_permission) permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.admin' def test_cached_perms_data_repository_groups_permissions( user_util, backend_random): user = user_util.create_user() repo_group = user_util.create_repo_group() granted_permission = 'group.write' user_util.grant_user_permission_to_repo_group( repo_group, user, granted_permission) permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.write' def test_cached_perms_data_repository_groups_permissions_explicit( user_util, backend_random): user = user_util.create_user() repo_group = user_util.create_repo_group() granted_permission = 'group.none' user_util.grant_user_permission_to_repo_group( repo_group, user, granted_permission) permissions = get_permissions(user, explicit=True) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.none' def test_cached_perms_data_repository_groups_permissions_owner( user_util, backend_random): user = user_util.create_user() repo_group = user_util.create_repo_group() repo_group.user_id = user.user_id granted_permission = 'group.write' user_util.grant_user_permission_to_repo_group( repo_group, user, granted_permission) permissions = get_permissions(user) assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.admin' def test_cached_perms_data_user_group_permissions_inherited( user_util, backend_random): user, user_group = user_util.create_user_with_group() user_group2 = user_util.create_user_group() UserGroupModel().add_user_to_group(user_group2, user) target_user_group = user_util.create_user_group() user_util.grant_user_group_permission_to_user_group( target_user_group, user_group, 'usergroup.read') user_util.grant_user_group_permission_to_user_group( target_user_group, user_group2, 'usergroup.write') permissions = get_permissions(user) assert permissions['user_groups'][target_user_group.users_group_name] == \ 'usergroup.write' def test_cached_perms_data_user_group_permissions( user_util, backend_random): user = user_util.create_user() user_group = user_util.create_user_group() UserGroupModel().grant_user_permission(user_group, user, 'usergroup.write') Session().commit() permissions = get_permissions(user) assert permissions['user_groups'][user_group.users_group_name] == \ 'usergroup.write' def test_cached_perms_data_user_group_permissions_explicit( user_util, backend_random): user = user_util.create_user() user_group = user_util.create_user_group() UserGroupModel().grant_user_permission(user_group, user, 'usergroup.none') Session().commit() permissions = get_permissions(user, explicit=True) assert permissions['user_groups'][user_group.users_group_name] == \ 'usergroup.none' def test_cached_perms_data_user_group_permissions_not_inheriting_defaults( user_util, backend_random): user = user_util.create_user() user_group = user_util.create_user_group() # Don't inherit default object permissions UserModel().grant_perm(user, 'hg.inherit_default_perms.false') Session().commit() permissions = get_permissions(user) assert permissions['user_groups'][user_group.users_group_name] == \ 'usergroup.none' def test_permission_calculator_admin_permissions( user_util, backend_random): user = user_util.create_user() user_group = user_util.create_user_group() repo = backend_random.repo repo_group = user_util.create_repo_group() calculator = auth.PermissionCalculator( user.user_id, {}, False, False, True, 'higherwin') permissions = calculator._calculate_super_admin_permissions() assert permissions['repositories_groups'][repo_group.group_name] == \ 'group.admin' assert permissions['user_groups'][user_group.users_group_name] == \ 'usergroup.admin' assert permissions['repositories'][repo.repo_name] == 'repository.admin' assert 'hg.admin' in permissions['global'] def test_permission_calculator_repository_permissions_robustness_from_group( user_util, backend_random): user, user_group = user_util.create_user_with_group() RepoModel().grant_user_group_permission( backend_random.repo, user_group.users_group_name, 'repository.write') calculator = auth.PermissionCalculator( user.user_id, {}, False, False, False, 'higherwin') calculator._calculate_repository_permissions() def test_permission_calculator_repository_permissions_robustness_from_user( user_util, backend_random): user = user_util.create_user() RepoModel().grant_user_permission( backend_random.repo, user, 'repository.write') Session().commit() calculator = auth.PermissionCalculator( user.user_id, {}, False, False, False, 'higherwin') calculator._calculate_repository_permissions() def test_permission_calculator_repo_group_permissions_robustness_from_group( user_util, backend_random): user, user_group = user_util.create_user_with_group() repo_group = user_util.create_repo_group() user_util.grant_user_group_permission_to_repo_group( repo_group, user_group, 'group.write') calculator = auth.PermissionCalculator( user.user_id, {}, False, False, False, 'higherwin') calculator._calculate_repository_group_permissions() def test_permission_calculator_repo_group_permissions_robustness_from_user( user_util, backend_random): user = user_util.create_user() repo_group = user_util.create_repo_group() user_util.grant_user_permission_to_repo_group( repo_group, user, 'group.write') calculator = auth.PermissionCalculator( user.user_id, {}, False, False, False, 'higherwin') calculator._calculate_repository_group_permissions() def test_permission_calculator_user_group_permissions_robustness_from_group( user_util, backend_random): user, user_group = user_util.create_user_with_group() target_user_group = user_util.create_user_group() user_util.grant_user_group_permission_to_user_group( target_user_group, user_group, 'usergroup.write') calculator = auth.PermissionCalculator( user.user_id, {}, False, False, False, 'higherwin') calculator._calculate_user_group_permissions() def test_permission_calculator_user_group_permissions_robustness_from_user( user_util, backend_random): user = user_util.create_user() target_user_group = user_util.create_user_group() user_util.grant_user_permission_to_user_group( target_user_group, user, 'usergroup.write') calculator = auth.PermissionCalculator( user.user_id, {}, False, False, False, 'higherwin') calculator._calculate_user_group_permissions() @pytest.mark.parametrize("algo, new_permission, old_permission, expected", [ ('higherwin', 'repository.none', 'repository.none', 'repository.none'), ('higherwin', 'repository.read', 'repository.none', 'repository.read'), ('lowerwin', 'repository.write', 'repository.write', 'repository.write'), ('lowerwin', 'repository.read', 'repository.write', 'repository.read'), ]) def test_permission_calculator_choose_permission( user_regular, algo, new_permission, old_permission, expected): calculator = auth.PermissionCalculator( user_regular.user_id, {}, False, False, False, algo) result = calculator._choose_permission(new_permission, old_permission) assert result == expected def test_permission_calculator_choose_permission_raises_on_wrong_algo( user_regular): calculator = auth.PermissionCalculator( user_regular.user_id, {}, False, False, False, 'invalid') result = calculator._choose_permission( 'repository.read', 'repository.read') # TODO: johbo: This documents the existing behavior. Think of an # improvement. assert result is None def test_auth_user_get_cookie_store_for_normal_user(user_util): user = user_util.create_user() auth_user = auth.AuthUser(user_id=user.user_id) expected_data = { 'username': user.username, 'user_id': user.user_id, 'password': md5_safe(user.password), 'is_authenticated': False } assert auth_user.get_cookie_store() == expected_data def test_auth_user_get_cookie_store_for_default_user(): default_user = User.get_default_user() auth_user = auth.AuthUser() expected_data = { 'username': User.DEFAULT_USER, 'user_id': default_user.user_id, 'password': md5_safe(default_user.password), 'is_authenticated': True } assert auth_user.get_cookie_store() == expected_data def get_permissions(user, **kwargs): """ Utility filling in useful defaults into the call to `_cached_perms_data`. Fill in `**kwargs` if specific values are needed for a test. """ call_args = { 'user_id': user.user_id, 'scope': {}, 'user_is_admin': False, 'user_inherit_default_permissions': False, 'explicit': False, 'algo': 'higherwin', 'calculate_super_admin': False, } call_args.update(kwargs) permissions = auth._cached_perms_data(**call_args) return permissions class TestGenerateAuthToken(object): def test_salt_is_used_when_specified(self): salt = b'abcde' user_name = 'test_user' result = auth.generate_auth_token(user_name, salt) expected_result = sha1(safe_bytes(user_name) + salt) assert result == expected_result def test_salt_is_geneated_when_not_specified(self): user_name = 'test_user' random_salt = os.urandom(16) with patch.object(auth, 'os') as os_mock: os_mock.urandom.return_value = random_salt result = auth.generate_auth_token(user_name) expected_result = sha1(safe_bytes(user_name) + random_salt) assert result == expected_result @pytest.mark.parametrize("test_token, test_roles, auth_result, expected_tokens", [ ('', None, False, []), ('wrongtoken', None, False, []), ('abracadabra_vcs', [AuthTokenModel.cls.ROLE_API], False, [('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1)]), ('abracadabra_api', [AuthTokenModel.cls.ROLE_API], True, [('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1)]), ('abracadabra_api', [AuthTokenModel.cls.ROLE_API], True, [('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1), ('abracadabra_http', AuthTokenModel.cls.ROLE_HTTP, -1)]), ]) def test_auth_by_token(test_token, test_roles, auth_result, expected_tokens, user_util): user = user_util.create_user() user_id = user.user_id for token, role, expires in expected_tokens: new_token = AuthTokenModel().create(user_id, u'test-token', expires, role) new_token.api_key = token # inject known name for testing... assert auth_result == user.authenticate_by_token( test_token, roles=test_roles)