# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import pytest from sqlalchemy.orm.exc import NoResultFound from rhodecode.lib import auth from rhodecode.lib import helpers as h from rhodecode.model.db import User, UserApiKeys, UserEmailMap, Repository from rhodecode.model.meta import Session from rhodecode.model.user import UserModel from rhodecode.tests import ( TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash) from rhodecode.tests.fixture import Fixture from rhodecode.tests.routes import route_path fixture = Fixture() class TestAdminUsersView(TestController): def test_show_users(self): self.log_user() self.app.get(route_path('users')) def test_show_users_data(self, xhr_header): self.log_user() response = self.app.get(route_path( 'users_data'), extra_environ=xhr_header) all_users = User.query().filter( User.username != User.DEFAULT_USER).count() assert response.json['recordsTotal'] == all_users def test_show_users_data_filtered(self, xhr_header): self.log_user() response = self.app.get(route_path( 'users_data', params={'search[value]': 'empty_search'}), extra_environ=xhr_header) all_users = User.query().filter( User.username != User.DEFAULT_USER).count() assert response.json['recordsTotal'] == all_users assert response.json['recordsFiltered'] == 0 def test_auth_tokens_default_user(self): self.log_user() user = User.get_default_user() response = self.app.get( route_path('edit_user_auth_tokens', user_id=user.user_id), status=302) def test_auth_tokens(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) user_id = user.user_id auth_tokens = user.auth_tokens response = self.app.get( route_path('edit_user_auth_tokens', user_id=user_id)) for token in auth_tokens: response.mustcontain(token[:4]) response.mustcontain('never') @pytest.mark.parametrize("desc, lifetime", [ ('forever', -1), ('5mins', 60*5), ('30days', 60*60*24*30), ]) def test_add_auth_token(self, desc, lifetime, user_util): self.log_user() user = user_util.create_user() user_id = user.user_id response = self.app.post( route_path('edit_user_auth_tokens_add', user_id=user_id), {'description': desc, 'lifetime': lifetime, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully created') response = response.follow() user = User.get(user_id) for auth_token in user.auth_tokens: response.mustcontain(auth_token[:4]) def test_delete_auth_token(self, user_util): self.log_user() user = user_util.create_user() user_id = user.user_id keys = user.auth_tokens assert 2 == len(keys) response = self.app.post( route_path('edit_user_auth_tokens_add', user_id=user_id), {'description': 'desc', 'lifetime': -1, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully created') response.follow() # now delete our key keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() assert 3 == len(keys) response = self.app.post( route_path('edit_user_auth_tokens_delete', user_id=user_id), {'del_auth_token': keys[0].user_api_key_id, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully deleted') keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() assert 2 == len(keys) def test_ips(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) response = self.app.get(route_path('edit_user_ips', user_id=user.user_id)) response.mustcontain('All IP addresses are allowed') @pytest.mark.parametrize("test_name, ip, ip_range, failure", [ ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False), ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False), ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False), ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False), ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True), ('127_bad_ip', 'foobar', 'foobar', True), ]) def test_ips_add(self, user_util, test_name, ip, ip_range, failure): self.log_user() user = user_util.create_user(username=test_name) user_id = user.user_id response = self.app.post( route_path('edit_user_ips_add', user_id=user_id), params={'new_ip': ip, 'csrf_token': self.csrf_token}) if failure: assert_session_flash( response, 'Please enter a valid IPv4 or IpV6 address') response = self.app.get(route_path('edit_user_ips', user_id=user_id)) response.mustcontain(no=[ip]) response.mustcontain(no=[ip_range]) else: response = self.app.get(route_path('edit_user_ips', user_id=user_id)) response.mustcontain(ip) response.mustcontain(ip_range) def test_ips_delete(self, user_util): self.log_user() user = user_util.create_user() user_id = user.user_id ip = '127.0.0.1/32' ip_range = '127.0.0.1 - 127.0.0.1' new_ip = UserModel().add_extra_ip(user_id, ip) Session().commit() new_ip_id = new_ip.ip_id response = self.app.get(route_path('edit_user_ips', user_id=user_id)) response.mustcontain(ip) response.mustcontain(ip_range) self.app.post( route_path('edit_user_ips_delete', user_id=user_id), params={'del_ip_id': new_ip_id, 'csrf_token': self.csrf_token}) response = self.app.get(route_path('edit_user_ips', user_id=user_id)) response.mustcontain('All IP addresses are allowed') response.mustcontain(no=[ip]) response.mustcontain(no=[ip_range]) def test_emails(self): self.log_user() user = User.get_by_username(TEST_USER_REGULAR_LOGIN) response = self.app.get( route_path('edit_user_emails', user_id=user.user_id)) response.mustcontain('No additional emails specified') def test_emails_add(self, user_util): self.log_user() user = user_util.create_user() user_id = user.user_id self.app.post( route_path('edit_user_emails_add', user_id=user_id), params={'new_email': 'example@rhodecode.com', 'csrf_token': self.csrf_token}) response = self.app.get( route_path('edit_user_emails', user_id=user_id)) response.mustcontain('example@rhodecode.com') def test_emails_add_existing_email(self, user_util, user_regular): existing_email = user_regular.email self.log_user() user = user_util.create_user() user_id = user.user_id response = self.app.post( route_path('edit_user_emails_add', user_id=user_id), params={'new_email': existing_email, 'csrf_token': self.csrf_token}) assert_session_flash( response, 'This e-mail address is already taken') response = self.app.get( route_path('edit_user_emails', user_id=user_id)) response.mustcontain(no=[existing_email]) def test_emails_delete(self, user_util): self.log_user() user = user_util.create_user() user_id = user.user_id self.app.post( route_path('edit_user_emails_add', user_id=user_id), params={'new_email': 'example@rhodecode.com', 'csrf_token': self.csrf_token}) response = self.app.get( route_path('edit_user_emails', user_id=user_id)) response.mustcontain('example@rhodecode.com') user_email = UserEmailMap.query()\ .filter(UserEmailMap.email == 'example@rhodecode.com') \ .filter(UserEmailMap.user_id == user_id)\ .one() del_email_id = user_email.email_id self.app.post( route_path('edit_user_emails_delete', user_id=user_id), params={'del_email_id': del_email_id, 'csrf_token': self.csrf_token}) response = self.app.get( route_path('edit_user_emails', user_id=user_id)) response.mustcontain(no=['example@rhodecode.com']) def test_create(self, request, xhr_header): self.log_user() username = 'newtestuser' password = 'test12' password_confirmation = password name = 'name' lastname = 'lastname' email = 'mail@mail.com' self.app.get(route_path('users_new')) response = self.app.post(route_path('users_create'), params={ 'username': username, 'password': password, 'description': 'mr CTO', 'password_confirmation': password_confirmation, 'firstname': name, 'active': True, 'lastname': lastname, 'extern_name': 'rhodecode', 'extern_type': 'rhodecode', 'email': email, 'csrf_token': self.csrf_token, }) user_link = h.link_to( username, route_path( 'user_edit', user_id=User.get_by_username(username).user_id)) assert_session_flash(response, 'Created user %s' % (user_link,)) @request.addfinalizer def cleanup(): fixture.destroy_user(username) Session().commit() new_user = User.query().filter(User.username == username).one() assert new_user.username == username assert auth.check_password(password, new_user.password) assert new_user.name == name assert new_user.lastname == lastname assert new_user.email == email response = self.app.get(route_path('users_data'), extra_environ=xhr_header) response.mustcontain(username) def test_create_err(self): self.log_user() username = 'new_user' password = '' name = 'name' lastname = 'lastname' email = 'errmail.com' self.app.get(route_path('users_new')) response = self.app.post(route_path('users_create'), params={ 'username': username, 'password': password, 'name': name, 'active': False, 'lastname': lastname, 'description': 'mr CTO', 'email': email, 'csrf_token': self.csrf_token, }) msg = u'Username "%(username)s" is forbidden' msg = h.html_escape(msg % {'username': 'new_user'}) response.mustcontain('%s' % msg) response.mustcontain( 'Please enter a value') response.mustcontain( 'An email address must contain a' ' single @') def get_user(): Session().query(User).filter(User.username == username).one() with pytest.raises(NoResultFound): get_user() def test_new(self): self.log_user() self.app.get(route_path('users_new')) @pytest.mark.parametrize("name, attrs", [ ('firstname', {'firstname': 'new_username'}), ('lastname', {'lastname': 'new_username'}), ('admin', {'admin': True}), ('admin', {'admin': False}), ('extern_type', {'extern_type': 'ldap'}), ('extern_type', {'extern_type': None}), ('extern_name', {'extern_name': 'test'}), ('extern_name', {'extern_name': None}), ('active', {'active': False}), ('active', {'active': True}), ('email', {'email': 'some@email.com'}), ('language', {'language': 'de'}), ('language', {'language': 'en'}), ('description', {'description': 'hello CTO'}), # ('new_password', {'new_password': 'foobar123', # 'password_confirmation': 'foobar123'}) ]) def test_update(self, name, attrs, user_util): self.log_user() usr = user_util.create_user( password='qweqwe', email='testme@rhodecode.org', extern_type='rhodecode', extern_name='xxx', ) user_id = usr.user_id Session().commit() params = usr.get_api_data() cur_lang = params['language'] or 'en' params.update({ 'password_confirmation': '', 'new_password': '', 'language': cur_lang, 'csrf_token': self.csrf_token, }) params.update({'new_password': ''}) params.update(attrs) if name == 'email': params['emails'] = [attrs['email']] elif name == 'extern_type': # cannot update this via form, expected value is original one params['extern_type'] = "rhodecode" elif name == 'extern_name': # cannot update this via form, expected value is original one params['extern_name'] = 'xxx' # special case since this user is not # logged in yet his data is not filled # so we use creation data response = self.app.post( route_path('user_update', user_id=usr.user_id), params) assert response.status_int == 302 assert_session_flash(response, 'User updated successfully') updated_user = User.get(user_id) updated_params = updated_user.get_api_data() updated_params.update({'password_confirmation': ''}) updated_params.update({'new_password': ''}) del params['csrf_token'] assert params == updated_params def test_update_and_migrate_password( self, autologin_user, real_crypto_backend, user_util): user = user_util.create_user() temp_user = user.username user.password = auth._RhodeCodeCryptoSha256().hash_create( b'test123') Session().add(user) Session().commit() params = user.get_api_data() params.update({ 'password_confirmation': 'qweqwe123', 'new_password': 'qweqwe123', 'language': 'en', 'csrf_token': autologin_user.csrf_token, }) response = self.app.post( route_path('user_update', user_id=user.user_id), params) assert response.status_int == 302 assert_session_flash(response, 'User updated successfully') # new password should be bcrypted, after log-in and transfer user = User.get_by_username(temp_user) assert user.password.startswith('$') updated_user = User.get_by_username(temp_user) updated_params = updated_user.get_api_data() updated_params.update({'password_confirmation': 'qweqwe123'}) updated_params.update({'new_password': 'qweqwe123'}) del params['csrf_token'] assert params == updated_params def test_delete(self): self.log_user() username = 'newtestuserdeleteme' fixture.create_user(name=username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'csrf_token': self.csrf_token}) assert_session_flash(response, 'Successfully deleted user `{}`'.format(username)) def test_delete_owner_of_repository(self, request, user_util): self.log_user() obj_name = 'test_repo' usr = user_util.create_user() username = usr.username fixture.create_repo(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'csrf_token': self.csrf_token}) msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \ 'Switch owners or remove those repositories:%s' % (username, obj_name) assert_session_flash(response, msg) fixture.destroy_repo(obj_name) def test_delete_owner_of_repository_detaching(self, request, user_util): self.log_user() obj_name = 'test_repo' usr = user_util.create_user(auto_cleanup=False) username = usr.username fixture.create_repo(obj_name, cur_user=usr.username) Session().commit() new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'user_repos': 'detach', 'csrf_token': self.csrf_token}) msg = 'Detached 1 repositories' assert_session_flash(response, msg) fixture.destroy_repo(obj_name) def test_delete_owner_of_repository_deleting(self, request, user_util): self.log_user() obj_name = 'test_repo' usr = user_util.create_user(auto_cleanup=False) username = usr.username fixture.create_repo(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'user_repos': 'delete', 'csrf_token': self.csrf_token}) msg = 'Deleted 1 repositories' assert_session_flash(response, msg) def test_delete_owner_of_repository_group(self, request, user_util): self.log_user() obj_name = 'test_group' usr = user_util.create_user() username = usr.username fixture.create_repo_group(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'csrf_token': self.csrf_token}) msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \ 'Switch owners or remove those repository groups:%s' % (username, obj_name) assert_session_flash(response, msg) fixture.destroy_repo_group(obj_name) def test_delete_owner_of_repository_group_detaching(self, request, user_util): self.log_user() obj_name = 'test_group' usr = user_util.create_user(auto_cleanup=False) username = usr.username fixture.create_repo_group(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'user_repo_groups': 'delete', 'csrf_token': self.csrf_token}) msg = 'Deleted 1 repository groups' assert_session_flash(response, msg) def test_delete_owner_of_repository_group_deleting(self, request, user_util): self.log_user() obj_name = 'test_group' usr = user_util.create_user(auto_cleanup=False) username = usr.username fixture.create_repo_group(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'user_repo_groups': 'detach', 'csrf_token': self.csrf_token}) msg = 'Detached 1 repository groups' assert_session_flash(response, msg) fixture.destroy_repo_group(obj_name) def test_delete_owner_of_user_group(self, request, user_util): self.log_user() obj_name = 'test_user_group' usr = user_util.create_user() username = usr.username fixture.create_user_group(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'csrf_token': self.csrf_token}) msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \ 'Switch owners or remove those user groups:%s' % (username, obj_name) assert_session_flash(response, msg) fixture.destroy_user_group(obj_name) def test_delete_owner_of_user_group_detaching(self, request, user_util): self.log_user() obj_name = 'test_user_group' usr = user_util.create_user(auto_cleanup=False) username = usr.username fixture.create_user_group(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() try: response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'user_user_groups': 'detach', 'csrf_token': self.csrf_token}) msg = 'Detached 1 user groups' assert_session_flash(response, msg) finally: fixture.destroy_user_group(obj_name) def test_delete_owner_of_user_group_deleting(self, request, user_util): self.log_user() obj_name = 'test_user_group' usr = user_util.create_user(auto_cleanup=False) username = usr.username fixture.create_user_group(obj_name, cur_user=usr.username) new_user = Session().query(User)\ .filter(User.username == username).one() response = self.app.post( route_path('user_delete', user_id=new_user.user_id), params={'user_user_groups': 'delete', 'csrf_token': self.csrf_token}) msg = 'Deleted 1 user groups' assert_session_flash(response, msg) def test_edit(self, user_util): self.log_user() user = user_util.create_user() self.app.get(route_path('user_edit', user_id=user.user_id)) def test_edit_default_user_redirect(self): self.log_user() user = User.get_default_user() self.app.get(route_path('user_edit', user_id=user.user_id), status=302) @pytest.mark.parametrize( 'repo_create, repo_create_write, user_group_create, repo_group_create,' 'fork_create, inherit_default_permissions, expect_error,' 'expect_form_error', [ ('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False), ('hg.create.repository', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False), ('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False), ('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True), ('', '', '', '', '', '', True, False), ]) def test_global_perms_on_user( self, repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, expect_error, expect_form_error, inherit_default_permissions, user_util): self.log_user() user = user_util.create_user() uid = user.user_id # ENABLE REPO CREATE ON A GROUP perm_params = { 'inherit_default_permissions': False, 'default_repo_create': repo_create, 'default_repo_create_on_write': repo_create_write, 'default_user_group_create': user_group_create, 'default_repo_group_create': repo_group_create, 'default_fork_create': fork_create, 'default_inherit_default_permissions': inherit_default_permissions, 'csrf_token': self.csrf_token, } response = self.app.post( route_path('user_edit_global_perms_update', user_id=uid), params=perm_params) if expect_form_error: assert response.status_int == 200 response.mustcontain('Value must be one of') else: if expect_error: msg = 'An error occurred during permissions saving' else: msg = 'User global permissions updated successfully' ug = User.get(uid) del perm_params['inherit_default_permissions'] del perm_params['csrf_token'] assert perm_params == ug.get_default_perms() assert_session_flash(response, msg) def test_global_permissions_initial_values(self, user_util): self.log_user() user = user_util.create_user() uid = user.user_id response = self.app.get( route_path('user_edit_global_perms', user_id=uid)) default_user = User.get_default_user() default_permissions = default_user.get_default_perms() assert_response = response.assert_response() expected_permissions = ( 'default_repo_create', 'default_repo_create_on_write', 'default_fork_create', 'default_repo_group_create', 'default_user_group_create', 'default_inherit_default_permissions') for permission in expected_permissions: css_selector = '[name={}][checked=checked]'.format(permission) element = assert_response.get_element(css_selector) assert element.value == default_permissions[permission] def test_perms_summary_page(self): user = self.log_user() response = self.app.get( route_path('edit_user_perms_summary', user_id=user['user_id'])) for repo in Repository.query().all(): response.mustcontain(repo.repo_name) def test_perms_summary_page_json(self): user = self.log_user() response = self.app.get( route_path('edit_user_perms_summary_json', user_id=user['user_id'])) for repo in Repository.query().all(): response.mustcontain(repo.repo_name) def test_audit_log_page(self): user = self.log_user() self.app.get( route_path('edit_user_audit_logs', user_id=user['user_id'])) def test_audit_log_page_download(self): user = self.log_user() user_id = user['user_id'] response = self.app.get( route_path('edit_user_audit_logs_download', user_id=user_id)) assert response.content_disposition == \ 'attachment; filename=user_{}_audit_logs.json'.format(user_id) assert response.content_type == "application/json"