##// END OF EJS Templates
diffs: use whole chunk diff to calculate if it's oversized or not....
diffs: use whole chunk diff to calculate if it's oversized or not. - This fixes an issue if a file is added that has very large number of small lines. In this case the time to detect if the diff should be limited was very very long and CPU intensive.

File last commit:

r1821:a4cc42be default
r2070:7939c6bf default
Show More
test_admin_users.py
512 lines | 20.5 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2017 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import pytest
from sqlalchemy.orm.exc import NoResultFound
from rhodecode.lib.auth import check_password
from rhodecode.lib import helpers as h
from rhodecode.model import validators
from rhodecode.model.db import User, UserIpMap, UserApiKeys
from rhodecode.model.meta import Session
from rhodecode.model.user import UserModel
from rhodecode.tests import (
TestController, url, link_to, TEST_USER_ADMIN_LOGIN,
TEST_USER_REGULAR_LOGIN, assert_session_flash)
from rhodecode.tests.fixture import Fixture
from rhodecode.tests.utils import AssertResponse
fixture = Fixture()
def route_path(name, params=None, **kwargs):
import urllib
from rhodecode.apps._base import ADMIN_PREFIX
base_url = {
'users_data':
ADMIN_PREFIX + '/users_data',
}[name].format(**kwargs)
if params:
base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
return base_url
class TestAdminUsersController(TestController):
test_user_1 = 'testme'
destroy_users = set()
@classmethod
def teardown_method(cls, method):
fixture.destroy_users(cls.destroy_users)
def test_create(self, xhr_header):
self.log_user()
username = 'newtestuser'
password = 'test12'
password_confirmation = password
name = 'name'
lastname = 'lastname'
email = 'mail@mail.com'
self.app.get(url('new_user'))
response = self.app.post(url('users'), params={
'username': username,
'password': password,
'password_confirmation': password_confirmation,
'firstname': name,
'active': True,
'lastname': lastname,
'extern_name': 'rhodecode',
'extern_type': 'rhodecode',
'email': email,
'csrf_token': self.csrf_token,
})
user_link = link_to(
username,
url('edit_user', user_id=User.get_by_username(username).user_id))
assert_session_flash(response, 'Created user %s' % (user_link,))
self.destroy_users.add(username)
new_user = User.query().filter(User.username == username).one()
assert new_user.username == username
assert check_password(password, new_user.password)
assert new_user.name == name
assert new_user.lastname == lastname
assert new_user.email == email
response = self.app.get(route_path('users_data'),
extra_environ=xhr_header)
response.mustcontain(username)
def test_create_err(self):
self.log_user()
username = 'new_user'
password = ''
name = 'name'
lastname = 'lastname'
email = 'errmail.com'
self.app.get(url('new_user'))
response = self.app.post(url('users'), params={
'username': username,
'password': password,
'name': name,
'active': False,
'lastname': lastname,
'email': email,
'csrf_token': self.csrf_token,
})
msg = validators.ValidUsername(
False, {})._messages['system_invalid_username']
msg = h.html_escape(msg % {'username': 'new_user'})
response.mustcontain('<span class="error-message">%s</span>' % msg)
response.mustcontain(
'<span class="error-message">Please enter a value</span>')
response.mustcontain(
'<span class="error-message">An email address must contain a'
' single @</span>')
def get_user():
Session().query(User).filter(User.username == username).one()
with pytest.raises(NoResultFound):
get_user()
def test_new(self):
self.log_user()
self.app.get(url('new_user'))
@pytest.mark.parametrize("name, attrs", [
('firstname', {'firstname': 'new_username'}),
('lastname', {'lastname': 'new_username'}),
('admin', {'admin': True}),
('admin', {'admin': False}),
('extern_type', {'extern_type': 'ldap'}),
('extern_type', {'extern_type': None}),
('extern_name', {'extern_name': 'test'}),
('extern_name', {'extern_name': None}),
('active', {'active': False}),
('active', {'active': True}),
('email', {'email': 'some@email.com'}),
('language', {'language': 'de'}),
('language', {'language': 'en'}),
# ('new_password', {'new_password': 'foobar123',
# 'password_confirmation': 'foobar123'})
])
def test_update(self, name, attrs):
self.log_user()
usr = fixture.create_user(self.test_user_1, password='qweqwe',
email='testme@rhodecode.org',
extern_type='rhodecode',
extern_name=self.test_user_1,
skip_if_exists=True)
Session().commit()
self.destroy_users.add(self.test_user_1)
params = usr.get_api_data()
cur_lang = params['language'] or 'en'
params.update({
'password_confirmation': '',
'new_password': '',
'language': cur_lang,
'_method': 'put',
'csrf_token': self.csrf_token,
})
params.update({'new_password': ''})
params.update(attrs)
if name == 'email':
params['emails'] = [attrs['email']]
elif name == 'extern_type':
# cannot update this via form, expected value is original one
params['extern_type'] = "rhodecode"
elif name == 'extern_name':
# cannot update this via form, expected value is original one
params['extern_name'] = self.test_user_1
# special case since this user is not
# logged in yet his data is not filled
# so we use creation data
response = self.app.post(url('user', user_id=usr.user_id), params)
assert response.status_int == 302
assert_session_flash(response, 'User updated successfully')
updated_user = User.get_by_username(self.test_user_1)
updated_params = updated_user.get_api_data()
updated_params.update({'password_confirmation': ''})
updated_params.update({'new_password': ''})
del params['_method']
del params['csrf_token']
assert params == updated_params
def test_update_and_migrate_password(
self, autologin_user, real_crypto_backend):
from rhodecode.lib import auth
# create new user, with sha256 password
temp_user = 'test_admin_sha256'
user = fixture.create_user(temp_user)
user.password = auth._RhodeCodeCryptoSha256().hash_create(
b'test123')
Session().add(user)
Session().commit()
self.destroy_users.add('test_admin_sha256')
params = user.get_api_data()
params.update({
'password_confirmation': 'qweqwe123',
'new_password': 'qweqwe123',
'language': 'en',
'_method': 'put',
'csrf_token': autologin_user.csrf_token,
})
response = self.app.post(url('user', user_id=user.user_id), params)
assert response.status_int == 302
assert_session_flash(response, 'User updated successfully')
# new password should be bcrypted, after log-in and transfer
user = User.get_by_username(temp_user)
assert user.password.startswith('$')
updated_user = User.get_by_username(temp_user)
updated_params = updated_user.get_api_data()
updated_params.update({'password_confirmation': 'qweqwe123'})
updated_params.update({'new_password': 'qweqwe123'})
del params['_method']
del params['csrf_token']
assert params == updated_params
def test_delete(self):
self.log_user()
username = 'newtestuserdeleteme'
fixture.create_user(name=username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
assert_session_flash(response, 'Successfully deleted user')
def test_delete_owner_of_repository(self):
self.log_user()
username = 'newtestuserdeleteme_repo_owner'
obj_name = 'test_repo'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
'Switch owners or remove those repositories:%s' % (username,
obj_name)
assert_session_flash(response, msg)
fixture.destroy_repo(obj_name)
def test_delete_owner_of_repository_detaching(self):
self.log_user()
username = 'newtestuserdeleteme_repo_owner_detach'
obj_name = 'test_repo'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repos': 'detach',
'csrf_token': self.csrf_token})
msg = 'Detached 1 repositories'
assert_session_flash(response, msg)
fixture.destroy_repo(obj_name)
def test_delete_owner_of_repository_deleting(self):
self.log_user()
username = 'newtestuserdeleteme_repo_owner_delete'
obj_name = 'test_repo'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repos': 'delete',
'csrf_token': self.csrf_token})
msg = 'Deleted 1 repositories'
assert_session_flash(response, msg)
def test_delete_owner_of_repository_group(self):
self.log_user()
username = 'newtestuserdeleteme_repo_group_owner'
obj_name = 'test_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
'Switch owners or remove those repository groups:%s' % (username,
obj_name)
assert_session_flash(response, msg)
fixture.destroy_repo_group(obj_name)
def test_delete_owner_of_repository_group_detaching(self):
self.log_user()
username = 'newtestuserdeleteme_repo_group_owner_detach'
obj_name = 'test_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repo_groups': 'delete',
'csrf_token': self.csrf_token})
msg = 'Deleted 1 repository groups'
assert_session_flash(response, msg)
def test_delete_owner_of_repository_group_deleting(self):
self.log_user()
username = 'newtestuserdeleteme_repo_group_owner_delete'
obj_name = 'test_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repo_groups': 'detach',
'csrf_token': self.csrf_token})
msg = 'Detached 1 repository groups'
assert_session_flash(response, msg)
fixture.destroy_repo_group(obj_name)
def test_delete_owner_of_user_group(self):
self.log_user()
username = 'newtestuserdeleteme_user_group_owner'
obj_name = 'test_user_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_user_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
'Switch owners or remove those user groups:%s' % (username,
obj_name)
assert_session_flash(response, msg)
fixture.destroy_user_group(obj_name)
def test_delete_owner_of_user_group_detaching(self):
self.log_user()
username = 'newtestuserdeleteme_user_group_owner_detaching'
obj_name = 'test_user_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_user_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
try:
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_user_groups': 'detach',
'csrf_token': self.csrf_token})
msg = 'Detached 1 user groups'
assert_session_flash(response, msg)
finally:
fixture.destroy_user_group(obj_name)
def test_delete_owner_of_user_group_deleting(self):
self.log_user()
username = 'newtestuserdeleteme_user_group_owner_deleting'
obj_name = 'test_user_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_user_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_user_groups': 'delete',
'csrf_token': self.csrf_token})
msg = 'Deleted 1 user groups'
assert_session_flash(response, msg)
def test_edit(self):
self.log_user()
user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
self.app.get(url('edit_user', user_id=user.user_id))
@pytest.mark.parametrize(
'repo_create, repo_create_write, user_group_create, repo_group_create,'
'fork_create, inherit_default_permissions, expect_error,'
'expect_form_error', [
('hg.create.none', 'hg.create.write_on_repogroup.false',
'hg.usergroup.create.false', 'hg.repogroup.create.false',
'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
('hg.create.repository', 'hg.create.write_on_repogroup.false',
'hg.usergroup.create.false', 'hg.repogroup.create.false',
'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
('hg.create.repository', 'hg.create.write_on_repogroup.true',
'hg.usergroup.create.true', 'hg.repogroup.create.true',
'hg.fork.repository', 'hg.inherit_default_perms.false', False,
False),
('hg.create.XXX', 'hg.create.write_on_repogroup.true',
'hg.usergroup.create.true', 'hg.repogroup.create.true',
'hg.fork.repository', 'hg.inherit_default_perms.false', False,
True),
('', '', '', '', '', '', True, False),
])
def test_global_perms_on_user(
self, repo_create, repo_create_write, user_group_create,
repo_group_create, fork_create, expect_error, expect_form_error,
inherit_default_permissions):
self.log_user()
user = fixture.create_user('dummy')
uid = user.user_id
# ENABLE REPO CREATE ON A GROUP
perm_params = {
'inherit_default_permissions': False,
'default_repo_create': repo_create,
'default_repo_create_on_write': repo_create_write,
'default_user_group_create': user_group_create,
'default_repo_group_create': repo_group_create,
'default_fork_create': fork_create,
'default_inherit_default_permissions': inherit_default_permissions,
'_method': 'put',
'csrf_token': self.csrf_token,
}
response = self.app.post(
url('edit_user_global_perms', user_id=uid),
params=perm_params)
if expect_form_error:
assert response.status_int == 200
response.mustcontain('Value must be one of')
else:
if expect_error:
msg = 'An error occurred during permissions saving'
else:
msg = 'User global permissions updated successfully'
ug = User.get(uid)
del perm_params['_method']
del perm_params['inherit_default_permissions']
del perm_params['csrf_token']
assert perm_params == ug.get_default_perms()
assert_session_flash(response, msg)
fixture.destroy_user(uid)
def test_global_permissions_initial_values(self, user_util):
self.log_user()
user = user_util.create_user()
uid = user.user_id
response = self.app.get(url('edit_user_global_perms', user_id=uid))
default_user = User.get_default_user()
default_permissions = default_user.get_default_perms()
assert_response = AssertResponse(response)
expected_permissions = (
'default_repo_create', 'default_repo_create_on_write',
'default_fork_create', 'default_repo_group_create',
'default_user_group_create', 'default_inherit_default_permissions')
for permission in expected_permissions:
css_selector = '[name={}][checked=checked]'.format(permission)
element = assert_response.get_element(css_selector)
assert element.value == default_permissions[permission]