# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import pytest import mock from rhodecode.model.db import ( true, User, UserGroup, UserGroupMember, UserEmailMap, Permission, UserIpMap) from rhodecode.model.meta import Session from rhodecode.model.user import UserModel from rhodecode.model.user_group import UserGroupModel from rhodecode.model.repo import RepoModel from rhodecode.model.repo_group import RepoGroupModel from rhodecode.tests.fixture import Fixture from rhodecode.lib.str_utils import safe_str fixture = Fixture() class TestGetUsers(object): def test_returns_active_users(self, backend, user_util): for i in range(4): is_active = i % 2 == 0 user_util.create_user(active=is_active, lastname='Fake user') with mock.patch('rhodecode.lib.helpers.gravatar_url'): with mock.patch('rhodecode.lib.helpers.link_to_user'): users = UserModel().get_users() fake_users = [u for u in users if u['last_name'] == 'Fake user'] assert len(fake_users) == 2 expected_keys = ( 'id', 'first_name', 'last_name', 'username', 'icon_link', 'value_display', 'value', 'value_type') for user in users: assert user['value_type'] == 'user' for key in expected_keys: assert key in user def test_returns_user_filtered_by_last_name(self, backend, user_util): keywords = ('aBc', u'ünicode') for keyword in keywords: for i in range(2): user_util.create_user( active=True, lastname=u'Fake {} user'.format(keyword)) with mock.patch('rhodecode.lib.helpers.gravatar_url'): with mock.patch('rhodecode.lib.helpers.link_to_user'): keyword = keywords[1].lower() users = UserModel().get_users(name_contains=keyword) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 2 for user in fake_users: assert user['last_name'] == safe_str('Fake ünicode user') def test_returns_user_filtered_by_first_name(self, backend, user_util): created_users = [] keywords = ('aBc', u'ünicode') for keyword in keywords: for i in range(2): created_users.append(user_util.create_user( active=True, lastname='Fake user', firstname=u'Fake {} user'.format(keyword))) keyword = keywords[1].lower() with mock.patch('rhodecode.lib.helpers.gravatar_url'): with mock.patch('rhodecode.lib.helpers.link_to_user'): users = UserModel().get_users(name_contains=keyword) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 2 for user in fake_users: assert user['first_name'] == safe_str('Fake ünicode user') def test_returns_user_filtered_by_username(self, backend, user_util): created_users = [] for i in range(5): created_users.append(user_util.create_user( active=True, lastname='Fake user')) user_filter = created_users[-1].username[-2:] with mock.patch('rhodecode.lib.helpers.gravatar_url'): with mock.patch('rhodecode.lib.helpers.link_to_user'): users = UserModel().get_users(name_contains=user_filter) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 1 assert fake_users[0]['username'] == created_users[-1].username def test_returns_limited_user_list(self, backend, user_util): created_users = [] for i in range(5): created_users.append(user_util.create_user( active=True, lastname='Fake user')) with mock.patch('rhodecode.lib.helpers.gravatar_url'): with mock.patch('rhodecode.lib.helpers.link_to_user'): users = UserModel().get_users(name_contains='Fake', limit=3) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 3 @pytest.fixture() def test_user(request, baseapp): usr = UserModel().create_or_update( username='test_user', password='qweqwe', email='main_email@rhodecode.org', firstname='u1', lastname=u'u1') Session().commit() assert User.get_by_username(u'test_user') == usr @request.addfinalizer def cleanup(): if UserModel().get_user(usr.user_id) is None: return perm = Permission.query().all() for p in perm: UserModel().revoke_perm(usr, p) UserModel().delete(usr.user_id) Session().commit() return usr def test_create_and_remove(test_user): usr = test_user # make user group user_group = fixture.create_user_group('some_example_group') Session().commit() UserGroupModel().add_user_to_group(user_group, usr) Session().commit() assert UserGroup.get(user_group.users_group_id) == user_group assert UserGroupMember.query().count() == 1 UserModel().delete(usr.user_id) Session().commit() assert UserGroupMember.query().all() == [] def test_additonal_email_as_main(test_user): with pytest.raises(AttributeError): m = UserEmailMap() m.email = test_user.email m.user = test_user Session().add(m) Session().commit() def test_extra_email_map(test_user): m = UserEmailMap() m.email = u'main_email2@rhodecode.org' m.user = test_user Session().add(m) Session().commit() u = User.get_by_email(email='main_email@rhodecode.org') assert test_user.user_id == u.user_id assert test_user.username == u.username u = User.get_by_email(email='main_email2@rhodecode.org') assert test_user.user_id == u.user_id assert test_user.username == u.username u = User.get_by_email(email='main_email3@rhodecode.org') assert u is None def test_get_api_data_replaces_secret_data_by_default(test_user): api_data = test_user.get_api_data() api_key_length = 40 expected_replacement = '*' * api_key_length for key in api_data['auth_tokens']: assert key == expected_replacement def test_get_api_data_includes_secret_data_if_activated(test_user): api_data = test_user.get_api_data(include_secrets=True) assert api_data['auth_tokens'] == test_user.auth_tokens def test_add_perm(test_user): perm = Permission.query().all()[0] UserModel().grant_perm(test_user, perm) Session().commit() assert UserModel().has_perm(test_user, perm) def test_has_perm(test_user): perm = Permission.query().all() for p in perm: assert not UserModel().has_perm(test_user, p) def test_revoke_perm(test_user): perm = Permission.query().all()[0] UserModel().grant_perm(test_user, perm) Session().commit() assert UserModel().has_perm(test_user, perm) # revoke UserModel().revoke_perm(test_user, perm) Session().commit() assert not UserModel().has_perm(test_user, perm) @pytest.mark.parametrize("ip_range, expected, expect_errors", [ ('', [], False), ('127.0.0.1', ['127.0.0.1'], False), ('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), ('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), ( '127.0.0.1,172.172.172.0,127.0.0.2', ['127.0.0.1', '172.172.172.0', '127.0.0.2'], False), ( '127.0.0.1-127.0.0.5', ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'], False), ( '127.0.0.1 - 127.0.0.5', ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'], False ), ('-', [], True), ('127.0.0.1-32', [], True), ( '127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), ( '127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,', ['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'], False ), ( '127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,', ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5', '127.0.0.6'], False ), ]) def test_ip_range_generator(ip_range, expected, expect_errors): func = UserModel().parse_ip_range if expect_errors: pytest.raises(ValueError, func, ip_range) else: parsed_list = func(ip_range) assert parsed_list == expected def test_user_delete_cascades_ip_whitelist(test_user): sample_ip = '1.1.1.1' uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip) Session().add(uid_map) Session().delete(test_user) try: Session().flush() finally: Session().rollback() def test_account_for_deactivation_generation(test_user): accounts = UserModel().get_accounts_in_creation_order( current_user=test_user) # current user should be #1 in the list assert accounts[0] == test_user.user_id active_users = User.query().filter(User.active == true()).count() assert active_users == len(accounts) def test_user_delete_cascades_permissions_on_repo(backend, test_user): test_repo = backend.create_repo() RepoModel().grant_user_permission( test_repo, test_user, 'repository.write') Session().commit() assert test_user.repo_to_perm UserModel().delete(test_user) Session().commit() def test_user_delete_cascades_permissions_on_repo_group( test_repo_group, test_user): RepoGroupModel().grant_user_permission( test_repo_group, test_user, 'group.write') Session().commit() assert test_user.repo_group_to_perm Session().delete(test_user) Session().commit() def test_user_delete_cascades_permissions_on_user_group( test_user_group, test_user): UserGroupModel().grant_user_permission( test_user_group, test_user, 'usergroup.write') Session().commit() assert test_user.user_group_to_perm Session().delete(test_user) Session().commit()