test_users.py
327 lines
| 10.7 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import pytest | ||||
r1677 | import mock | |||
r1 | ||||
r1677 | from rhodecode.model.db import ( | |||
true, User, UserGroup, UserGroupMember, UserEmailMap, Permission, UserIpMap) | ||||
r1 | from rhodecode.model.meta import Session | |||
from rhodecode.model.user import UserModel | ||||
from rhodecode.model.user_group import UserGroupModel | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.repo_group import RepoGroupModel | ||||
r5607 | from rhodecode.tests.fixtures.rc_fixture import Fixture | |||
r5087 | from rhodecode.lib.str_utils import safe_str | |||
r1 | ||||
fixture = Fixture() | ||||
r1677 | class TestGetUsers(object): | |||
def test_returns_active_users(self, backend, user_util): | ||||
for i in range(4): | ||||
is_active = i % 2 == 0 | ||||
user_util.create_user(active=is_active, lastname='Fake user') | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
r2484 | with mock.patch('rhodecode.lib.helpers.link_to_user'): | |||
users = UserModel().get_users() | ||||
r1677 | fake_users = [u for u in users if u['last_name'] == 'Fake user'] | |||
assert len(fake_users) == 2 | ||||
expected_keys = ( | ||||
'id', 'first_name', 'last_name', 'username', 'icon_link', | ||||
'value_display', 'value', 'value_type') | ||||
for user in users: | ||||
r5089 | assert user['value_type'] == 'user' | |||
r1677 | for key in expected_keys: | |||
assert key in user | ||||
def test_returns_user_filtered_by_last_name(self, backend, user_util): | ||||
keywords = ('aBc', u'ünicode') | ||||
for keyword in keywords: | ||||
for i in range(2): | ||||
user_util.create_user( | ||||
active=True, lastname=u'Fake {} user'.format(keyword)) | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
r2484 | with mock.patch('rhodecode.lib.helpers.link_to_user'): | |||
keyword = keywords[1].lower() | ||||
users = UserModel().get_users(name_contains=keyword) | ||||
r1677 | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 2 | ||||
for user in fake_users: | ||||
r5087 | assert user['last_name'] == safe_str('Fake ünicode user') | |||
r1677 | ||||
def test_returns_user_filtered_by_first_name(self, backend, user_util): | ||||
created_users = [] | ||||
keywords = ('aBc', u'ünicode') | ||||
for keyword in keywords: | ||||
for i in range(2): | ||||
created_users.append(user_util.create_user( | ||||
active=True, lastname='Fake user', | ||||
firstname=u'Fake {} user'.format(keyword))) | ||||
keyword = keywords[1].lower() | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
r2484 | with mock.patch('rhodecode.lib.helpers.link_to_user'): | |||
users = UserModel().get_users(name_contains=keyword) | ||||
r1677 | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 2 | ||||
for user in fake_users: | ||||
r5087 | assert user['first_name'] == safe_str('Fake ünicode user') | |||
r1677 | ||||
def test_returns_user_filtered_by_username(self, backend, user_util): | ||||
created_users = [] | ||||
for i in range(5): | ||||
created_users.append(user_util.create_user( | ||||
active=True, lastname='Fake user')) | ||||
user_filter = created_users[-1].username[-2:] | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
r2484 | with mock.patch('rhodecode.lib.helpers.link_to_user'): | |||
users = UserModel().get_users(name_contains=user_filter) | ||||
r1677 | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 1 | ||||
assert fake_users[0]['username'] == created_users[-1].username | ||||
def test_returns_limited_user_list(self, backend, user_util): | ||||
created_users = [] | ||||
for i in range(5): | ||||
created_users.append(user_util.create_user( | ||||
active=True, lastname='Fake user')) | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
r2484 | with mock.patch('rhodecode.lib.helpers.link_to_user'): | |||
users = UserModel().get_users(name_contains='Fake', limit=3) | ||||
r1677 | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 3 | ||||
r3946 | @pytest.fixture() | |||
r2351 | def test_user(request, baseapp): | |||
r1 | usr = UserModel().create_or_update( | |||
r5087 | username='test_user', | |||
password='qweqwe', | ||||
email='main_email@rhodecode.org', | ||||
firstname='u1', lastname=u'u1') | ||||
r1 | Session().commit() | |||
assert User.get_by_username(u'test_user') == usr | ||||
@request.addfinalizer | ||||
def cleanup(): | ||||
if UserModel().get_user(usr.user_id) is None: | ||||
return | ||||
perm = Permission.query().all() | ||||
for p in perm: | ||||
UserModel().revoke_perm(usr, p) | ||||
UserModel().delete(usr.user_id) | ||||
Session().commit() | ||||
return usr | ||||
def test_create_and_remove(test_user): | ||||
usr = test_user | ||||
# make user group | ||||
user_group = fixture.create_user_group('some_example_group') | ||||
Session().commit() | ||||
UserGroupModel().add_user_to_group(user_group, usr) | ||||
Session().commit() | ||||
assert UserGroup.get(user_group.users_group_id) == user_group | ||||
assert UserGroupMember.query().count() == 1 | ||||
UserModel().delete(usr.user_id) | ||||
Session().commit() | ||||
assert UserGroupMember.query().all() == [] | ||||
def test_additonal_email_as_main(test_user): | ||||
with pytest.raises(AttributeError): | ||||
m = UserEmailMap() | ||||
m.email = test_user.email | ||||
m.user = test_user | ||||
Session().add(m) | ||||
Session().commit() | ||||
def test_extra_email_map(test_user): | ||||
m = UserEmailMap() | ||||
m.email = u'main_email2@rhodecode.org' | ||||
m.user = test_user | ||||
Session().add(m) | ||||
Session().commit() | ||||
u = User.get_by_email(email='main_email@rhodecode.org') | ||||
assert test_user.user_id == u.user_id | ||||
assert test_user.username == u.username | ||||
u = User.get_by_email(email='main_email2@rhodecode.org') | ||||
assert test_user.user_id == u.user_id | ||||
assert test_user.username == u.username | ||||
u = User.get_by_email(email='main_email3@rhodecode.org') | ||||
assert u is None | ||||
def test_get_api_data_replaces_secret_data_by_default(test_user): | ||||
api_data = test_user.get_api_data() | ||||
api_key_length = 40 | ||||
expected_replacement = '*' * api_key_length | ||||
r1953 | for key in api_data['auth_tokens']: | |||
r1 | assert key == expected_replacement | |||
def test_get_api_data_includes_secret_data_if_activated(test_user): | ||||
api_data = test_user.get_api_data(include_secrets=True) | ||||
r1953 | assert api_data['auth_tokens'] == test_user.auth_tokens | |||
r1 | ||||
def test_add_perm(test_user): | ||||
perm = Permission.query().all()[0] | ||||
UserModel().grant_perm(test_user, perm) | ||||
Session().commit() | ||||
assert UserModel().has_perm(test_user, perm) | ||||
def test_has_perm(test_user): | ||||
perm = Permission.query().all() | ||||
for p in perm: | ||||
assert not UserModel().has_perm(test_user, p) | ||||
def test_revoke_perm(test_user): | ||||
perm = Permission.query().all()[0] | ||||
UserModel().grant_perm(test_user, perm) | ||||
Session().commit() | ||||
assert UserModel().has_perm(test_user, perm) | ||||
# revoke | ||||
UserModel().revoke_perm(test_user, perm) | ||||
Session().commit() | ||||
assert not UserModel().has_perm(test_user, perm) | ||||
@pytest.mark.parametrize("ip_range, expected, expect_errors", [ | ||||
('', [], False), | ||||
('127.0.0.1', ['127.0.0.1'], False), | ||||
('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), | ||||
('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), | ||||
( | ||||
'127.0.0.1,172.172.172.0,127.0.0.2', | ||||
['127.0.0.1', '172.172.172.0', '127.0.0.2'], False), | ||||
( | ||||
'127.0.0.1-127.0.0.5', | ||||
['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'], | ||||
False), | ||||
( | ||||
'127.0.0.1 - 127.0.0.5', | ||||
['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'], | ||||
False | ||||
), | ||||
('-', [], True), | ||||
('127.0.0.1-32', [], True), | ||||
( | ||||
'127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2', | ||||
['127.0.0.1', '127.0.0.2'], False), | ||||
( | ||||
'127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,', | ||||
['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'], | ||||
False | ||||
), | ||||
( | ||||
'127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,', | ||||
['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5', | ||||
'127.0.0.6'], | ||||
False | ||||
), | ||||
]) | ||||
def test_ip_range_generator(ip_range, expected, expect_errors): | ||||
func = UserModel().parse_ip_range | ||||
if expect_errors: | ||||
r1907 | pytest.raises(ValueError, func, ip_range) | |||
r1 | else: | |||
parsed_list = func(ip_range) | ||||
assert parsed_list == expected | ||||
def test_user_delete_cascades_ip_whitelist(test_user): | ||||
sample_ip = '1.1.1.1' | ||||
uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip) | ||||
Session().add(uid_map) | ||||
Session().delete(test_user) | ||||
try: | ||||
Session().flush() | ||||
finally: | ||||
Session().rollback() | ||||
def test_account_for_deactivation_generation(test_user): | ||||
accounts = UserModel().get_accounts_in_creation_order( | ||||
current_user=test_user) | ||||
# current user should be #1 in the list | ||||
assert accounts[0] == test_user.user_id | ||||
active_users = User.query().filter(User.active == true()).count() | ||||
assert active_users == len(accounts) | ||||
def test_user_delete_cascades_permissions_on_repo(backend, test_user): | ||||
test_repo = backend.create_repo() | ||||
RepoModel().grant_user_permission( | ||||
test_repo, test_user, 'repository.write') | ||||
Session().commit() | ||||
assert test_user.repo_to_perm | ||||
UserModel().delete(test_user) | ||||
Session().commit() | ||||
def test_user_delete_cascades_permissions_on_repo_group( | ||||
test_repo_group, test_user): | ||||
RepoGroupModel().grant_user_permission( | ||||
test_repo_group, test_user, 'group.write') | ||||
Session().commit() | ||||
assert test_user.repo_group_to_perm | ||||
Session().delete(test_user) | ||||
Session().commit() | ||||
def test_user_delete_cascades_permissions_on_user_group( | ||||
test_user_group, test_user): | ||||
UserGroupModel().grant_user_permission( | ||||
test_user_group, test_user, 'usergroup.write') | ||||
Session().commit() | ||||
assert test_user.user_group_to_perm | ||||
Session().delete(test_user) | ||||
Session().commit() | ||||