test_admin_users.py
575 lines
| 23.1 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r1271 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import pytest | ||||
from sqlalchemy.orm.exc import NoResultFound | ||||
from rhodecode.lib.auth import check_password | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.model import validators | ||||
from rhodecode.model.db import User, UserIpMap, UserApiKeys | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.user import UserModel | ||||
from rhodecode.tests import ( | ||||
TestController, url, link_to, TEST_USER_ADMIN_LOGIN, | ||||
TEST_USER_REGULAR_LOGIN, assert_session_flash) | ||||
from rhodecode.tests.fixture import Fixture | ||||
from rhodecode.tests.utils import AssertResponse | ||||
fixture = Fixture() | ||||
r1520 | def route_path(name, params=None, **kwargs): | |||
import urllib | ||||
from rhodecode.apps._base import ADMIN_PREFIX | ||||
base_url = { | ||||
'users_data': | ||||
ADMIN_PREFIX + '/users_data', | ||||
}[name].format(**kwargs) | ||||
if params: | ||||
base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) | ||||
return base_url | ||||
r1 | class TestAdminUsersController(TestController): | |||
test_user_1 = 'testme' | ||||
destroy_users = set() | ||||
@classmethod | ||||
def teardown_method(cls, method): | ||||
fixture.destroy_users(cls.destroy_users) | ||||
r1520 | def test_create(self, xhr_header): | |||
r1 | self.log_user() | |||
username = 'newtestuser' | ||||
password = 'test12' | ||||
password_confirmation = password | ||||
name = 'name' | ||||
lastname = 'lastname' | ||||
email = 'mail@mail.com' | ||||
r1520 | self.app.get(url('new_user')) | |||
r1 | ||||
response = self.app.post(url('users'), params={ | ||||
'username': username, | ||||
'password': password, | ||||
'password_confirmation': password_confirmation, | ||||
'firstname': name, | ||||
'active': True, | ||||
'lastname': lastname, | ||||
'extern_name': 'rhodecode', | ||||
'extern_type': 'rhodecode', | ||||
'email': email, | ||||
'csrf_token': self.csrf_token, | ||||
}) | ||||
user_link = link_to( | ||||
username, | ||||
url('edit_user', user_id=User.get_by_username(username).user_id)) | ||||
assert_session_flash(response, 'Created user %s' % (user_link,)) | ||||
self.destroy_users.add(username) | ||||
new_user = User.query().filter(User.username == username).one() | ||||
assert new_user.username == username | ||||
assert check_password(password, new_user.password) | ||||
assert new_user.name == name | ||||
assert new_user.lastname == lastname | ||||
assert new_user.email == email | ||||
r1520 | response = self.app.get(route_path('users_data'), | |||
extra_environ=xhr_header) | ||||
r1 | response.mustcontain(username) | |||
def test_create_err(self): | ||||
self.log_user() | ||||
username = 'new_user' | ||||
password = '' | ||||
name = 'name' | ||||
lastname = 'lastname' | ||||
email = 'errmail.com' | ||||
r1520 | self.app.get(url('new_user')) | |||
r1 | ||||
response = self.app.post(url('users'), params={ | ||||
'username': username, | ||||
'password': password, | ||||
'name': name, | ||||
'active': False, | ||||
'lastname': lastname, | ||||
'email': email, | ||||
'csrf_token': self.csrf_token, | ||||
}) | ||||
msg = validators.ValidUsername( | ||||
False, {})._messages['system_invalid_username'] | ||||
msg = h.html_escape(msg % {'username': 'new_user'}) | ||||
response.mustcontain('<span class="error-message">%s</span>' % msg) | ||||
response.mustcontain( | ||||
'<span class="error-message">Please enter a value</span>') | ||||
response.mustcontain( | ||||
'<span class="error-message">An email address must contain a' | ||||
' single @</span>') | ||||
def get_user(): | ||||
Session().query(User).filter(User.username == username).one() | ||||
with pytest.raises(NoResultFound): | ||||
get_user() | ||||
def test_new(self): | ||||
self.log_user() | ||||
self.app.get(url('new_user')) | ||||
@pytest.mark.parametrize("name, attrs", [ | ||||
('firstname', {'firstname': 'new_username'}), | ||||
('lastname', {'lastname': 'new_username'}), | ||||
('admin', {'admin': True}), | ||||
('admin', {'admin': False}), | ||||
('extern_type', {'extern_type': 'ldap'}), | ||||
('extern_type', {'extern_type': None}), | ||||
('extern_name', {'extern_name': 'test'}), | ||||
('extern_name', {'extern_name': None}), | ||||
('active', {'active': False}), | ||||
('active', {'active': True}), | ||||
('email', {'email': 'some@email.com'}), | ||||
('language', {'language': 'de'}), | ||||
('language', {'language': 'en'}), | ||||
# ('new_password', {'new_password': 'foobar123', | ||||
# 'password_confirmation': 'foobar123'}) | ||||
]) | ||||
def test_update(self, name, attrs): | ||||
self.log_user() | ||||
usr = fixture.create_user(self.test_user_1, password='qweqwe', | ||||
email='testme@rhodecode.org', | ||||
extern_type='rhodecode', | ||||
extern_name=self.test_user_1, | ||||
skip_if_exists=True) | ||||
Session().commit() | ||||
self.destroy_users.add(self.test_user_1) | ||||
params = usr.get_api_data() | ||||
cur_lang = params['language'] or 'en' | ||||
params.update({ | ||||
'password_confirmation': '', | ||||
'new_password': '', | ||||
'language': cur_lang, | ||||
'_method': 'put', | ||||
'csrf_token': self.csrf_token, | ||||
}) | ||||
params.update({'new_password': ''}) | ||||
params.update(attrs) | ||||
if name == 'email': | ||||
params['emails'] = [attrs['email']] | ||||
elif name == 'extern_type': | ||||
# cannot update this via form, expected value is original one | ||||
params['extern_type'] = "rhodecode" | ||||
elif name == 'extern_name': | ||||
# cannot update this via form, expected value is original one | ||||
params['extern_name'] = self.test_user_1 | ||||
# special case since this user is not | ||||
# logged in yet his data is not filled | ||||
# so we use creation data | ||||
response = self.app.post(url('user', user_id=usr.user_id), params) | ||||
assert response.status_int == 302 | ||||
assert_session_flash(response, 'User updated successfully') | ||||
updated_user = User.get_by_username(self.test_user_1) | ||||
updated_params = updated_user.get_api_data() | ||||
updated_params.update({'password_confirmation': ''}) | ||||
updated_params.update({'new_password': ''}) | ||||
del params['_method'] | ||||
del params['csrf_token'] | ||||
assert params == updated_params | ||||
def test_update_and_migrate_password( | ||||
self, autologin_user, real_crypto_backend): | ||||
from rhodecode.lib import auth | ||||
# create new user, with sha256 password | ||||
temp_user = 'test_admin_sha256' | ||||
user = fixture.create_user(temp_user) | ||||
user.password = auth._RhodeCodeCryptoSha256().hash_create( | ||||
b'test123') | ||||
Session().add(user) | ||||
Session().commit() | ||||
self.destroy_users.add('test_admin_sha256') | ||||
params = user.get_api_data() | ||||
params.update({ | ||||
'password_confirmation': 'qweqwe123', | ||||
'new_password': 'qweqwe123', | ||||
'language': 'en', | ||||
'_method': 'put', | ||||
'csrf_token': autologin_user.csrf_token, | ||||
}) | ||||
response = self.app.post(url('user', user_id=user.user_id), params) | ||||
assert response.status_int == 302 | ||||
assert_session_flash(response, 'User updated successfully') | ||||
# new password should be bcrypted, after log-in and transfer | ||||
user = User.get_by_username(temp_user) | ||||
assert user.password.startswith('$') | ||||
updated_user = User.get_by_username(temp_user) | ||||
updated_params = updated_user.get_api_data() | ||||
updated_params.update({'password_confirmation': 'qweqwe123'}) | ||||
updated_params.update({'new_password': 'qweqwe123'}) | ||||
del params['_method'] | ||||
del params['csrf_token'] | ||||
assert params == updated_params | ||||
def test_delete(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme' | ||||
fixture.create_user(name=username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
assert_session_flash(response, 'Successfully deleted user') | ||||
def test_delete_owner_of_repository(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_repo_owner' | ||||
obj_name = 'test_repo' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_repo(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \ | ||||
'Switch owners or remove those repositories:%s' % (username, | ||||
obj_name) | ||||
assert_session_flash(response, msg) | ||||
fixture.destroy_repo(obj_name) | ||||
def test_delete_owner_of_repository_detaching(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_repo_owner_detach' | ||||
obj_name = 'test_repo' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_repo(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'user_repos': 'detach', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'Detached 1 repositories' | ||||
assert_session_flash(response, msg) | ||||
fixture.destroy_repo(obj_name) | ||||
def test_delete_owner_of_repository_deleting(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_repo_owner_delete' | ||||
obj_name = 'test_repo' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_repo(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'user_repos': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'Deleted 1 repositories' | ||||
assert_session_flash(response, msg) | ||||
def test_delete_owner_of_repository_group(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_repo_group_owner' | ||||
obj_name = 'test_group' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_repo_group(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \ | ||||
'Switch owners or remove those repository groups:%s' % (username, | ||||
obj_name) | ||||
assert_session_flash(response, msg) | ||||
fixture.destroy_repo_group(obj_name) | ||||
def test_delete_owner_of_repository_group_detaching(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_repo_group_owner_detach' | ||||
obj_name = 'test_group' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_repo_group(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'user_repo_groups': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'Deleted 1 repository groups' | ||||
assert_session_flash(response, msg) | ||||
def test_delete_owner_of_repository_group_deleting(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_repo_group_owner_delete' | ||||
obj_name = 'test_group' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_repo_group(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'user_repo_groups': 'detach', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'Detached 1 repository groups' | ||||
assert_session_flash(response, msg) | ||||
fixture.destroy_repo_group(obj_name) | ||||
def test_delete_owner_of_user_group(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_user_group_owner' | ||||
obj_name = 'test_user_group' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_user_group(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \ | ||||
'Switch owners or remove those user groups:%s' % (username, | ||||
obj_name) | ||||
assert_session_flash(response, msg) | ||||
fixture.destroy_user_group(obj_name) | ||||
def test_delete_owner_of_user_group_detaching(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_user_group_owner_detaching' | ||||
obj_name = 'test_user_group' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_user_group(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
try: | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'user_user_groups': 'detach', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'Detached 1 user groups' | ||||
assert_session_flash(response, msg) | ||||
finally: | ||||
fixture.destroy_user_group(obj_name) | ||||
def test_delete_owner_of_user_group_deleting(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme_user_group_owner_deleting' | ||||
obj_name = 'test_user_group' | ||||
usr = fixture.create_user(name=username) | ||||
self.destroy_users.add(username) | ||||
fixture.create_user_group(obj_name, cur_user=usr.username) | ||||
new_user = Session().query(User)\ | ||||
.filter(User.username == username).one() | ||||
response = self.app.post(url('user', user_id=new_user.user_id), | ||||
params={'_method': 'delete', | ||||
'user_user_groups': 'delete', | ||||
'csrf_token': self.csrf_token}) | ||||
msg = 'Deleted 1 user groups' | ||||
assert_session_flash(response, msg) | ||||
def test_edit(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | ||||
self.app.get(url('edit_user', user_id=user.user_id)) | ||||
@pytest.mark.parametrize( | ||||
'repo_create, repo_create_write, user_group_create, repo_group_create,' | ||||
'fork_create, inherit_default_permissions, expect_error,' | ||||
'expect_form_error', [ | ||||
('hg.create.none', 'hg.create.write_on_repogroup.false', | ||||
'hg.usergroup.create.false', 'hg.repogroup.create.false', | ||||
'hg.fork.none', 'hg.inherit_default_perms.false', False, False), | ||||
('hg.create.repository', 'hg.create.write_on_repogroup.false', | ||||
'hg.usergroup.create.false', 'hg.repogroup.create.false', | ||||
'hg.fork.none', 'hg.inherit_default_perms.false', False, False), | ||||
('hg.create.repository', 'hg.create.write_on_repogroup.true', | ||||
'hg.usergroup.create.true', 'hg.repogroup.create.true', | ||||
'hg.fork.repository', 'hg.inherit_default_perms.false', False, | ||||
False), | ||||
('hg.create.XXX', 'hg.create.write_on_repogroup.true', | ||||
'hg.usergroup.create.true', 'hg.repogroup.create.true', | ||||
'hg.fork.repository', 'hg.inherit_default_perms.false', False, | ||||
True), | ||||
('', '', '', '', '', '', True, False), | ||||
]) | ||||
def test_global_perms_on_user( | ||||
self, repo_create, repo_create_write, user_group_create, | ||||
repo_group_create, fork_create, expect_error, expect_form_error, | ||||
inherit_default_permissions): | ||||
self.log_user() | ||||
user = fixture.create_user('dummy') | ||||
uid = user.user_id | ||||
# ENABLE REPO CREATE ON A GROUP | ||||
perm_params = { | ||||
'inherit_default_permissions': False, | ||||
'default_repo_create': repo_create, | ||||
'default_repo_create_on_write': repo_create_write, | ||||
'default_user_group_create': user_group_create, | ||||
'default_repo_group_create': repo_group_create, | ||||
'default_fork_create': fork_create, | ||||
'default_inherit_default_permissions': inherit_default_permissions, | ||||
'_method': 'put', | ||||
'csrf_token': self.csrf_token, | ||||
} | ||||
response = self.app.post( | ||||
url('edit_user_global_perms', user_id=uid), | ||||
params=perm_params) | ||||
if expect_form_error: | ||||
assert response.status_int == 200 | ||||
response.mustcontain('Value must be one of') | ||||
else: | ||||
if expect_error: | ||||
msg = 'An error occurred during permissions saving' | ||||
else: | ||||
msg = 'User global permissions updated successfully' | ||||
ug = User.get(uid) | ||||
del perm_params['_method'] | ||||
del perm_params['inherit_default_permissions'] | ||||
del perm_params['csrf_token'] | ||||
assert perm_params == ug.get_default_perms() | ||||
assert_session_flash(response, msg) | ||||
fixture.destroy_user(uid) | ||||
def test_global_permissions_initial_values(self, user_util): | ||||
self.log_user() | ||||
user = user_util.create_user() | ||||
uid = user.user_id | ||||
response = self.app.get(url('edit_user_global_perms', user_id=uid)) | ||||
default_user = User.get_default_user() | ||||
default_permissions = default_user.get_default_perms() | ||||
assert_response = AssertResponse(response) | ||||
expected_permissions = ( | ||||
'default_repo_create', 'default_repo_create_on_write', | ||||
'default_fork_create', 'default_repo_group_create', | ||||
'default_user_group_create', 'default_inherit_default_permissions') | ||||
for permission in expected_permissions: | ||||
css_selector = '[name={}][checked=checked]'.format(permission) | ||||
element = assert_response.get_element(css_selector) | ||||
assert element.value == default_permissions[permission] | ||||
def test_ips(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
response = self.app.get(url('edit_user_ips', user_id=user.user_id)) | ||||
response.mustcontain('All IP addresses are allowed') | ||||
@pytest.mark.parametrize("test_name, ip, ip_range, failure", [ | ||||
('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False), | ||||
('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False), | ||||
('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False), | ||||
('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False), | ||||
('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True), | ||||
('127_bad_ip', 'foobar', 'foobar', True), | ||||
]) | ||||
def test_add_ip(self, test_name, ip, ip_range, failure): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
response = self.app.post(url('edit_user_ips', user_id=user_id), | ||||
params={'new_ip': ip, '_method': 'put', | ||||
'csrf_token': self.csrf_token}) | ||||
if failure: | ||||
assert_session_flash( | ||||
response, 'Please enter a valid IPv4 or IpV6 address') | ||||
response = self.app.get(url('edit_user_ips', user_id=user_id)) | ||||
response.mustcontain(no=[ip]) | ||||
response.mustcontain(no=[ip_range]) | ||||
else: | ||||
response = self.app.get(url('edit_user_ips', user_id=user_id)) | ||||
response.mustcontain(ip) | ||||
response.mustcontain(ip_range) | ||||
# cleanup | ||||
for del_ip in UserIpMap.query().filter( | ||||
UserIpMap.user_id == user_id).all(): | ||||
Session().delete(del_ip) | ||||
Session().commit() | ||||
def test_delete_ip(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
ip = '127.0.0.1/32' | ||||
ip_range = '127.0.0.1 - 127.0.0.1' | ||||
new_ip = UserModel().add_extra_ip(user_id, ip) | ||||
Session().commit() | ||||
new_ip_id = new_ip.ip_id | ||||
response = self.app.get(url('edit_user_ips', user_id=user_id)) | ||||
response.mustcontain(ip) | ||||
response.mustcontain(ip_range) | ||||
self.app.post(url('edit_user_ips', user_id=user_id), | ||||
params={'_method': 'delete', 'del_ip_id': new_ip_id, | ||||
'csrf_token': self.csrf_token}) | ||||
response = self.app.get(url('edit_user_ips', user_id=user_id)) | ||||
response.mustcontain('All IP addresses are allowed') | ||||
response.mustcontain(no=[ip]) | ||||
response.mustcontain(no=[ip_range]) | ||||