test_auth.py
632 lines
| 22.0 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r3363 | # Copyright (C) 2010-2019 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
from hashlib import sha1 | ||||
import pytest | ||||
from mock import patch | ||||
from rhodecode.lib import auth | ||||
from rhodecode.lib.utils2 import md5 | ||||
r1421 | from rhodecode.model.auth_token import AuthTokenModel | |||
r3981 | from rhodecode.model.db import Session, User | |||
r1 | from rhodecode.model.repo import RepoModel | |||
from rhodecode.model.user import UserModel | ||||
from rhodecode.model.user_group import UserGroupModel | ||||
r100 | def test_perm_origin_dict(): | |||
pod = auth.PermOriginDict() | ||||
pod['thing'] = 'read', 'default' | ||||
assert pod['thing'] == 'read' | ||||
assert pod.perm_origin_stack == { | ||||
'thing': [('read', 'default')]} | ||||
pod['thing'] = 'write', 'admin' | ||||
assert pod['thing'] == 'write' | ||||
assert pod.perm_origin_stack == { | ||||
'thing': [('read', 'default'), ('write', 'admin')]} | ||||
pod['other'] = 'write', 'default' | ||||
assert pod.perm_origin_stack == { | ||||
'other': [('write', 'default')], | ||||
'thing': [('read', 'default'), ('write', 'admin')]} | ||||
pod['other'] = 'none', 'override' | ||||
assert pod.perm_origin_stack == { | ||||
'other': [('write', 'default'), ('none', 'override')], | ||||
'thing': [('read', 'default'), ('write', 'admin')]} | ||||
with pytest.raises(ValueError): | ||||
pod['thing'] = 'read' | ||||
r1 | def test_cached_perms_data(user_regular, backend_random): | |||
permissions = get_permissions(user_regular) | ||||
repo_name = backend_random.repo.repo_name | ||||
expected_global_permissions = { | ||||
'repository.read', 'group.read', 'usergroup.read'} | ||||
assert expected_global_permissions.issubset(permissions['global']) | ||||
assert permissions['repositories'][repo_name] == 'repository.read' | ||||
def test_cached_perms_data_with_admin_user(user_regular, backend_random): | ||||
permissions = get_permissions(user_regular, user_is_admin=True) | ||||
repo_name = backend_random.repo.repo_name | ||||
assert 'hg.admin' in permissions['global'] | ||||
assert permissions['repositories'][repo_name] == 'repository.admin' | ||||
r2065 | def test_cached_perms_data_with_admin_user_extended_calculation(user_regular, backend_random): | |||
permissions = get_permissions(user_regular, user_is_admin=True, | ||||
calculate_super_admin=True) | ||||
repo_name = backend_random.repo.repo_name | ||||
assert 'hg.admin' in permissions['global'] | ||||
assert permissions['repositories'][repo_name] == 'repository.admin' | ||||
r1 | def test_cached_perms_data_user_group_global_permissions(user_util): | |||
user, user_group = user_util.create_user_with_group() | ||||
user_group.inherit_default_permissions = False | ||||
granted_permission = 'repository.write' | ||||
UserGroupModel().grant_perm(user_group, granted_permission) | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert granted_permission in permissions['global'] | ||||
@pytest.mark.xfail(reason="Not implemented, see TODO note") | ||||
def test_cached_perms_data_user_group_global_permissions_(user_util): | ||||
user, user_group = user_util.create_user_with_group() | ||||
granted_permission = 'repository.write' | ||||
UserGroupModel().grant_perm(user_group, granted_permission) | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert granted_permission in permissions['global'] | ||||
def test_cached_perms_data_user_global_permissions(user_util): | ||||
user = user_util.create_user() | ||||
UserModel().grant_perm(user, 'repository.none') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user, user_inherit_default_permissions=True) | ||||
assert 'repository.read' in permissions['global'] | ||||
def test_cached_perms_data_repository_permissions_on_private_repository( | ||||
backend_random, user_util): | ||||
user, user_group = user_util.create_user_with_group() | ||||
repo = backend_random.create_repo() | ||||
repo.private = True | ||||
granted_permission = 'repository.write' | ||||
RepoModel().grant_user_group_permission( | ||||
repo, user_group.users_group_name, granted_permission) | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == granted_permission | ||||
def test_cached_perms_data_repository_permissions_for_owner( | ||||
backend_random, user_util): | ||||
user = user_util.create_user() | ||||
repo = backend_random.create_repo() | ||||
repo.user_id = user.user_id | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == 'repository.admin' | ||||
# TODO: johbo: Make cleanup in UserUtility smarter, then remove this hack | ||||
repo.user_id = User.get_default_user().user_id | ||||
def test_cached_perms_data_repository_permissions_not_inheriting_defaults( | ||||
backend_random, user_util): | ||||
user = user_util.create_user() | ||||
repo = backend_random.create_repo() | ||||
# Don't inherit default object permissions | ||||
UserModel().grant_perm(user, 'hg.inherit_default_perms.false') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == 'repository.none' | ||||
def test_cached_perms_data_default_permissions_on_repository_group(user_util): | ||||
# Have a repository group with default permissions set | ||||
repo_group = user_util.create_repo_group() | ||||
default_user = User.get_default_user() | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, default_user, 'repository.write') | ||||
user = user_util.create_user() | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'repository.write' | ||||
def test_cached_perms_data_default_permissions_on_repository_group_owner( | ||||
user_util): | ||||
# Have a repository group | ||||
repo_group = user_util.create_repo_group() | ||||
default_user = User.get_default_user() | ||||
# Add a permission for the default user to hit the code path | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, default_user, 'repository.write') | ||||
# Have an owner of the group | ||||
user = user_util.create_user() | ||||
repo_group.user_id = user.user_id | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.admin' | ||||
def test_cached_perms_data_default_permissions_on_repository_group_no_inherit( | ||||
user_util): | ||||
# Have a repository group | ||||
repo_group = user_util.create_repo_group() | ||||
default_user = User.get_default_user() | ||||
# Add a permission for the default user to hit the code path | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, default_user, 'repository.write') | ||||
# Don't inherit default object permissions | ||||
user = user_util.create_user() | ||||
UserModel().grant_perm(user, 'hg.inherit_default_perms.false') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.none' | ||||
def test_cached_perms_data_repository_permissions_from_user_group( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
# Needs a second user group to make sure that we select the right | ||||
# permissions. | ||||
user_group2 = user_util.create_user_group() | ||||
UserGroupModel().add_user_to_group(user_group2, user) | ||||
repo = backend_random.create_repo() | ||||
RepoModel().grant_user_group_permission( | ||||
repo, user_group.users_group_name, 'repository.read') | ||||
RepoModel().grant_user_group_permission( | ||||
repo, user_group2.users_group_name, 'repository.write') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == 'repository.write' | ||||
def test_cached_perms_data_repository_permissions_from_user_group_owner( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
repo = backend_random.create_repo() | ||||
repo.user_id = user.user_id | ||||
RepoModel().grant_user_group_permission( | ||||
repo, user_group.users_group_name, 'repository.write') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == 'repository.admin' | ||||
def test_cached_perms_data_user_repository_permissions( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo = backend_random.create_repo() | ||||
granted_permission = 'repository.write' | ||||
RepoModel().grant_user_permission(repo, user, granted_permission) | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == granted_permission | ||||
def test_cached_perms_data_user_repository_permissions_explicit( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo = backend_random.create_repo() | ||||
granted_permission = 'repository.none' | ||||
RepoModel().grant_user_permission(repo, user, granted_permission) | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user, explicit=True) | ||||
assert permissions['repositories'][repo.repo_name] == granted_permission | ||||
def test_cached_perms_data_user_repository_permissions_owner( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo = backend_random.create_repo() | ||||
repo.user_id = user.user_id | ||||
RepoModel().grant_user_permission(repo, user, 'repository.write') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories'][repo.repo_name] == 'repository.admin' | ||||
def test_cached_perms_data_repository_groups_permissions_inherited( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
# Needs a second group to hit the last condition | ||||
user_group2 = user_util.create_user_group() | ||||
UserGroupModel().add_user_to_group(user_group2, user) | ||||
repo_group = user_util.create_repo_group() | ||||
user_util.grant_user_group_permission_to_repo_group( | ||||
repo_group, user_group, 'group.read') | ||||
user_util.grant_user_group_permission_to_repo_group( | ||||
repo_group, user_group2, 'group.write') | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.write' | ||||
def test_cached_perms_data_repository_groups_permissions_inherited_owner( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
repo_group = user_util.create_repo_group() | ||||
repo_group.user_id = user.user_id | ||||
granted_permission = 'group.write' | ||||
user_util.grant_user_group_permission_to_repo_group( | ||||
repo_group, user_group, granted_permission) | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.admin' | ||||
def test_cached_perms_data_repository_groups_permissions( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo_group = user_util.create_repo_group() | ||||
granted_permission = 'group.write' | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, user, granted_permission) | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.write' | ||||
def test_cached_perms_data_repository_groups_permissions_explicit( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo_group = user_util.create_repo_group() | ||||
granted_permission = 'group.none' | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, user, granted_permission) | ||||
permissions = get_permissions(user, explicit=True) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.none' | ||||
def test_cached_perms_data_repository_groups_permissions_owner( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo_group = user_util.create_repo_group() | ||||
repo_group.user_id = user.user_id | ||||
granted_permission = 'group.write' | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, user, granted_permission) | ||||
permissions = get_permissions(user) | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.admin' | ||||
def test_cached_perms_data_user_group_permissions_inherited( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
user_group2 = user_util.create_user_group() | ||||
UserGroupModel().add_user_to_group(user_group2, user) | ||||
target_user_group = user_util.create_user_group() | ||||
user_util.grant_user_group_permission_to_user_group( | ||||
target_user_group, user_group, 'usergroup.read') | ||||
user_util.grant_user_group_permission_to_user_group( | ||||
target_user_group, user_group2, 'usergroup.write') | ||||
permissions = get_permissions(user) | ||||
assert permissions['user_groups'][target_user_group.users_group_name] == \ | ||||
'usergroup.write' | ||||
def test_cached_perms_data_user_group_permissions( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
user_group = user_util.create_user_group() | ||||
UserGroupModel().grant_user_permission(user_group, user, 'usergroup.write') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['user_groups'][user_group.users_group_name] == \ | ||||
'usergroup.write' | ||||
def test_cached_perms_data_user_group_permissions_explicit( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
user_group = user_util.create_user_group() | ||||
UserGroupModel().grant_user_permission(user_group, user, 'usergroup.none') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user, explicit=True) | ||||
assert permissions['user_groups'][user_group.users_group_name] == \ | ||||
'usergroup.none' | ||||
def test_cached_perms_data_user_group_permissions_not_inheriting_defaults( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
user_group = user_util.create_user_group() | ||||
# Don't inherit default object permissions | ||||
UserModel().grant_perm(user, 'hg.inherit_default_perms.false') | ||||
r3981 | Session().commit() | |||
r1 | ||||
permissions = get_permissions(user) | ||||
assert permissions['user_groups'][user_group.users_group_name] == \ | ||||
'usergroup.none' | ||||
def test_permission_calculator_admin_permissions( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
user_group = user_util.create_user_group() | ||||
repo = backend_random.repo | ||||
repo_group = user_util.create_repo_group() | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, True, 'higherwin') | ||||
r2979 | permissions = calculator._calculate_admin_permissions() | |||
r1 | ||||
assert permissions['repositories_groups'][repo_group.group_name] == \ | ||||
'group.admin' | ||||
assert permissions['user_groups'][user_group.users_group_name] == \ | ||||
'usergroup.admin' | ||||
assert permissions['repositories'][repo.repo_name] == 'repository.admin' | ||||
assert 'hg.admin' in permissions['global'] | ||||
def test_permission_calculator_repository_permissions_robustness_from_group( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
RepoModel().grant_user_group_permission( | ||||
backend_random.repo, user_group.users_group_name, 'repository.write') | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, False, 'higherwin') | ||||
calculator._calculate_repository_permissions() | ||||
def test_permission_calculator_repository_permissions_robustness_from_user( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
RepoModel().grant_user_permission( | ||||
backend_random.repo, user, 'repository.write') | ||||
r3981 | Session().commit() | |||
r1 | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, False, 'higherwin') | ||||
calculator._calculate_repository_permissions() | ||||
def test_permission_calculator_repo_group_permissions_robustness_from_group( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
repo_group = user_util.create_repo_group() | ||||
user_util.grant_user_group_permission_to_repo_group( | ||||
repo_group, user_group, 'group.write') | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, False, 'higherwin') | ||||
calculator._calculate_repository_group_permissions() | ||||
def test_permission_calculator_repo_group_permissions_robustness_from_user( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
repo_group = user_util.create_repo_group() | ||||
user_util.grant_user_permission_to_repo_group( | ||||
repo_group, user, 'group.write') | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, False, 'higherwin') | ||||
calculator._calculate_repository_group_permissions() | ||||
def test_permission_calculator_user_group_permissions_robustness_from_group( | ||||
user_util, backend_random): | ||||
user, user_group = user_util.create_user_with_group() | ||||
target_user_group = user_util.create_user_group() | ||||
user_util.grant_user_group_permission_to_user_group( | ||||
target_user_group, user_group, 'usergroup.write') | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, False, 'higherwin') | ||||
calculator._calculate_user_group_permissions() | ||||
def test_permission_calculator_user_group_permissions_robustness_from_user( | ||||
user_util, backend_random): | ||||
user = user_util.create_user() | ||||
target_user_group = user_util.create_user_group() | ||||
user_util.grant_user_permission_to_user_group( | ||||
target_user_group, user, 'usergroup.write') | ||||
calculator = auth.PermissionCalculator( | ||||
user.user_id, {}, False, False, False, 'higherwin') | ||||
calculator._calculate_user_group_permissions() | ||||
@pytest.mark.parametrize("algo, new_permission, old_permission, expected", [ | ||||
('higherwin', 'repository.none', 'repository.none', 'repository.none'), | ||||
('higherwin', 'repository.read', 'repository.none', 'repository.read'), | ||||
('lowerwin', 'repository.write', 'repository.write', 'repository.write'), | ||||
('lowerwin', 'repository.read', 'repository.write', 'repository.read'), | ||||
]) | ||||
def test_permission_calculator_choose_permission( | ||||
user_regular, algo, new_permission, old_permission, expected): | ||||
calculator = auth.PermissionCalculator( | ||||
user_regular.user_id, {}, False, False, False, algo) | ||||
result = calculator._choose_permission(new_permission, old_permission) | ||||
assert result == expected | ||||
def test_permission_calculator_choose_permission_raises_on_wrong_algo( | ||||
user_regular): | ||||
calculator = auth.PermissionCalculator( | ||||
user_regular.user_id, {}, False, False, False, 'invalid') | ||||
result = calculator._choose_permission( | ||||
'repository.read', 'repository.read') | ||||
# TODO: johbo: This documents the existing behavior. Think of an | ||||
# improvement. | ||||
assert result is None | ||||
def test_auth_user_get_cookie_store_for_normal_user(user_util): | ||||
user = user_util.create_user() | ||||
auth_user = auth.AuthUser(user_id=user.user_id) | ||||
expected_data = { | ||||
'username': user.username, | ||||
'user_id': user.user_id, | ||||
'password': md5(user.password), | ||||
'is_authenticated': False | ||||
} | ||||
assert auth_user.get_cookie_store() == expected_data | ||||
def test_auth_user_get_cookie_store_for_default_user(): | ||||
default_user = User.get_default_user() | ||||
auth_user = auth.AuthUser() | ||||
expected_data = { | ||||
'username': User.DEFAULT_USER, | ||||
'user_id': default_user.user_id, | ||||
'password': md5(default_user.password), | ||||
'is_authenticated': True | ||||
} | ||||
assert auth_user.get_cookie_store() == expected_data | ||||
def get_permissions(user, **kwargs): | ||||
""" | ||||
Utility filling in useful defaults into the call to `_cached_perms_data`. | ||||
Fill in `**kwargs` if specific values are needed for a test. | ||||
""" | ||||
call_args = { | ||||
'user_id': user.user_id, | ||||
'scope': {}, | ||||
'user_is_admin': False, | ||||
'user_inherit_default_permissions': False, | ||||
'explicit': False, | ||||
'algo': 'higherwin', | ||||
r2065 | 'calculate_super_admin': False, | |||
r1 | } | |||
call_args.update(kwargs) | ||||
permissions = auth._cached_perms_data(**call_args) | ||||
return permissions | ||||
class TestGenerateAuthToken(object): | ||||
def test_salt_is_used_when_specified(self): | ||||
salt = 'abcde' | ||||
user_name = 'test_user' | ||||
result = auth.generate_auth_token(user_name, salt) | ||||
expected_result = sha1(user_name + salt).hexdigest() | ||||
assert result == expected_result | ||||
def test_salt_is_geneated_when_not_specified(self): | ||||
user_name = 'test_user' | ||||
random_salt = os.urandom(16) | ||||
with patch.object(auth, 'os') as os_mock: | ||||
os_mock.urandom.return_value = random_salt | ||||
result = auth.generate_auth_token(user_name) | ||||
expected_result = sha1(user_name + random_salt).hexdigest() | ||||
assert result == expected_result | ||||
r1421 | ||||
@pytest.mark.parametrize("test_token, test_roles, auth_result, expected_tokens", [ | ||||
('', None, False, | ||||
[]), | ||||
('wrongtoken', None, False, | ||||
[]), | ||||
('abracadabra_vcs', [AuthTokenModel.cls.ROLE_API], False, | ||||
[('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1)]), | ||||
('abracadabra_api', [AuthTokenModel.cls.ROLE_API], True, | ||||
[('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1)]), | ||||
('abracadabra_api', [AuthTokenModel.cls.ROLE_API], True, | ||||
[('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1), | ||||
('abracadabra_http', AuthTokenModel.cls.ROLE_HTTP, -1)]), | ||||
]) | ||||
def test_auth_by_token(test_token, test_roles, auth_result, expected_tokens, | ||||
user_util): | ||||
user = user_util.create_user() | ||||
user_id = user.user_id | ||||
for token, role, expires in expected_tokens: | ||||
r3981 | new_token = AuthTokenModel().create(user_id, u'test-token', expires, role) | |||
r1421 | new_token.api_key = token # inject known name for testing... | |||
assert auth_result == user.authenticate_by_token( | ||||
r1477 | test_token, roles=test_roles) | |||