##// END OF EJS Templates
vcs: Only allow 'pull' actions on shadow repositories....
vcs: Only allow 'pull' actions on shadow repositories. We are exposing the shadow repositories of pull requests to allow easy CI integration or users to access the pull request shadow repo for investigating on it. But we don't want someone/something to push changes to a shadow repository.

File last commit:

r1:854a839a default
r891:910a0be0 default
Show More
test_admin_users.py
644 lines | 25.8 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import pytest
from sqlalchemy.orm.exc import NoResultFound
from rhodecode.lib.auth import check_password
from rhodecode.lib import helpers as h
from rhodecode.model import validators
from rhodecode.model.db import User, UserIpMap, UserApiKeys
from rhodecode.model.meta import Session
from rhodecode.model.user import UserModel
from rhodecode.tests import (
TestController, url, link_to, TEST_USER_ADMIN_LOGIN,
TEST_USER_REGULAR_LOGIN, assert_session_flash)
from rhodecode.tests.fixture import Fixture
from rhodecode.tests.utils import AssertResponse
fixture = Fixture()
class TestAdminUsersController(TestController):
test_user_1 = 'testme'
destroy_users = set()
@classmethod
def teardown_method(cls, method):
fixture.destroy_users(cls.destroy_users)
def test_index(self):
self.log_user()
self.app.get(url('users'))
def test_create(self):
self.log_user()
username = 'newtestuser'
password = 'test12'
password_confirmation = password
name = 'name'
lastname = 'lastname'
email = 'mail@mail.com'
response = self.app.get(url('new_user'))
response = self.app.post(url('users'), params={
'username': username,
'password': password,
'password_confirmation': password_confirmation,
'firstname': name,
'active': True,
'lastname': lastname,
'extern_name': 'rhodecode',
'extern_type': 'rhodecode',
'email': email,
'csrf_token': self.csrf_token,
})
user_link = link_to(
username,
url('edit_user', user_id=User.get_by_username(username).user_id))
assert_session_flash(response, 'Created user %s' % (user_link,))
self.destroy_users.add(username)
new_user = User.query().filter(User.username == username).one()
assert new_user.username == username
assert check_password(password, new_user.password)
assert new_user.name == name
assert new_user.lastname == lastname
assert new_user.email == email
response.follow()
response = response.follow()
response.mustcontain(username)
def test_create_err(self):
self.log_user()
username = 'new_user'
password = ''
name = 'name'
lastname = 'lastname'
email = 'errmail.com'
response = self.app.get(url('new_user'))
response = self.app.post(url('users'), params={
'username': username,
'password': password,
'name': name,
'active': False,
'lastname': lastname,
'email': email,
'csrf_token': self.csrf_token,
})
msg = validators.ValidUsername(
False, {})._messages['system_invalid_username']
msg = h.html_escape(msg % {'username': 'new_user'})
response.mustcontain('<span class="error-message">%s</span>' % msg)
response.mustcontain(
'<span class="error-message">Please enter a value</span>')
response.mustcontain(
'<span class="error-message">An email address must contain a'
' single @</span>')
def get_user():
Session().query(User).filter(User.username == username).one()
with pytest.raises(NoResultFound):
get_user()
def test_new(self):
self.log_user()
self.app.get(url('new_user'))
@pytest.mark.parametrize("name, attrs", [
('firstname', {'firstname': 'new_username'}),
('lastname', {'lastname': 'new_username'}),
('admin', {'admin': True}),
('admin', {'admin': False}),
('extern_type', {'extern_type': 'ldap'}),
('extern_type', {'extern_type': None}),
('extern_name', {'extern_name': 'test'}),
('extern_name', {'extern_name': None}),
('active', {'active': False}),
('active', {'active': True}),
('email', {'email': 'some@email.com'}),
('language', {'language': 'de'}),
('language', {'language': 'en'}),
# ('new_password', {'new_password': 'foobar123',
# 'password_confirmation': 'foobar123'})
])
def test_update(self, name, attrs):
self.log_user()
usr = fixture.create_user(self.test_user_1, password='qweqwe',
email='testme@rhodecode.org',
extern_type='rhodecode',
extern_name=self.test_user_1,
skip_if_exists=True)
Session().commit()
self.destroy_users.add(self.test_user_1)
params = usr.get_api_data()
cur_lang = params['language'] or 'en'
params.update({
'password_confirmation': '',
'new_password': '',
'language': cur_lang,
'_method': 'put',
'csrf_token': self.csrf_token,
})
params.update({'new_password': ''})
params.update(attrs)
if name == 'email':
params['emails'] = [attrs['email']]
elif name == 'extern_type':
# cannot update this via form, expected value is original one
params['extern_type'] = "rhodecode"
elif name == 'extern_name':
# cannot update this via form, expected value is original one
params['extern_name'] = self.test_user_1
# special case since this user is not
# logged in yet his data is not filled
# so we use creation data
response = self.app.post(url('user', user_id=usr.user_id), params)
assert response.status_int == 302
assert_session_flash(response, 'User updated successfully')
updated_user = User.get_by_username(self.test_user_1)
updated_params = updated_user.get_api_data()
updated_params.update({'password_confirmation': ''})
updated_params.update({'new_password': ''})
del params['_method']
del params['csrf_token']
assert params == updated_params
def test_update_and_migrate_password(
self, autologin_user, real_crypto_backend):
from rhodecode.lib import auth
# create new user, with sha256 password
temp_user = 'test_admin_sha256'
user = fixture.create_user(temp_user)
user.password = auth._RhodeCodeCryptoSha256().hash_create(
b'test123')
Session().add(user)
Session().commit()
self.destroy_users.add('test_admin_sha256')
params = user.get_api_data()
params.update({
'password_confirmation': 'qweqwe123',
'new_password': 'qweqwe123',
'language': 'en',
'_method': 'put',
'csrf_token': autologin_user.csrf_token,
})
response = self.app.post(url('user', user_id=user.user_id), params)
assert response.status_int == 302
assert_session_flash(response, 'User updated successfully')
# new password should be bcrypted, after log-in and transfer
user = User.get_by_username(temp_user)
assert user.password.startswith('$')
updated_user = User.get_by_username(temp_user)
updated_params = updated_user.get_api_data()
updated_params.update({'password_confirmation': 'qweqwe123'})
updated_params.update({'new_password': 'qweqwe123'})
del params['_method']
del params['csrf_token']
assert params == updated_params
def test_delete(self):
self.log_user()
username = 'newtestuserdeleteme'
fixture.create_user(name=username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
assert_session_flash(response, 'Successfully deleted user')
def test_delete_owner_of_repository(self):
self.log_user()
username = 'newtestuserdeleteme_repo_owner'
obj_name = 'test_repo'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
'Switch owners or remove those repositories:%s' % (username,
obj_name)
assert_session_flash(response, msg)
fixture.destroy_repo(obj_name)
def test_delete_owner_of_repository_detaching(self):
self.log_user()
username = 'newtestuserdeleteme_repo_owner_detach'
obj_name = 'test_repo'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repos': 'detach',
'csrf_token': self.csrf_token})
msg = 'Detached 1 repositories'
assert_session_flash(response, msg)
fixture.destroy_repo(obj_name)
def test_delete_owner_of_repository_deleting(self):
self.log_user()
username = 'newtestuserdeleteme_repo_owner_delete'
obj_name = 'test_repo'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repos': 'delete',
'csrf_token': self.csrf_token})
msg = 'Deleted 1 repositories'
assert_session_flash(response, msg)
def test_delete_owner_of_repository_group(self):
self.log_user()
username = 'newtestuserdeleteme_repo_group_owner'
obj_name = 'test_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
'Switch owners or remove those repository groups:%s' % (username,
obj_name)
assert_session_flash(response, msg)
fixture.destroy_repo_group(obj_name)
def test_delete_owner_of_repository_group_detaching(self):
self.log_user()
username = 'newtestuserdeleteme_repo_group_owner_detach'
obj_name = 'test_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repo_groups': 'delete',
'csrf_token': self.csrf_token})
msg = 'Deleted 1 repository groups'
assert_session_flash(response, msg)
def test_delete_owner_of_repository_group_deleting(self):
self.log_user()
username = 'newtestuserdeleteme_repo_group_owner_delete'
obj_name = 'test_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_repo_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_repo_groups': 'detach',
'csrf_token': self.csrf_token})
msg = 'Detached 1 repository groups'
assert_session_flash(response, msg)
fixture.destroy_repo_group(obj_name)
def test_delete_owner_of_user_group(self):
self.log_user()
username = 'newtestuserdeleteme_user_group_owner'
obj_name = 'test_user_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_user_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'csrf_token': self.csrf_token})
msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
'Switch owners or remove those user groups:%s' % (username,
obj_name)
assert_session_flash(response, msg)
fixture.destroy_user_group(obj_name)
def test_delete_owner_of_user_group_detaching(self):
self.log_user()
username = 'newtestuserdeleteme_user_group_owner_detaching'
obj_name = 'test_user_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_user_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
try:
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_user_groups': 'detach',
'csrf_token': self.csrf_token})
msg = 'Detached 1 user groups'
assert_session_flash(response, msg)
finally:
fixture.destroy_user_group(obj_name)
def test_delete_owner_of_user_group_deleting(self):
self.log_user()
username = 'newtestuserdeleteme_user_group_owner_deleting'
obj_name = 'test_user_group'
usr = fixture.create_user(name=username)
self.destroy_users.add(username)
fixture.create_user_group(obj_name, cur_user=usr.username)
new_user = Session().query(User)\
.filter(User.username == username).one()
response = self.app.post(url('user', user_id=new_user.user_id),
params={'_method': 'delete',
'user_user_groups': 'delete',
'csrf_token': self.csrf_token})
msg = 'Deleted 1 user groups'
assert_session_flash(response, msg)
def test_show(self):
self.app.get(url('user', user_id=1))
def test_edit(self):
self.log_user()
user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
self.app.get(url('edit_user', user_id=user.user_id))
@pytest.mark.parametrize(
'repo_create, repo_create_write, user_group_create, repo_group_create,'
'fork_create, inherit_default_permissions, expect_error,'
'expect_form_error', [
('hg.create.none', 'hg.create.write_on_repogroup.false',
'hg.usergroup.create.false', 'hg.repogroup.create.false',
'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
('hg.create.repository', 'hg.create.write_on_repogroup.false',
'hg.usergroup.create.false', 'hg.repogroup.create.false',
'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
('hg.create.repository', 'hg.create.write_on_repogroup.true',
'hg.usergroup.create.true', 'hg.repogroup.create.true',
'hg.fork.repository', 'hg.inherit_default_perms.false', False,
False),
('hg.create.XXX', 'hg.create.write_on_repogroup.true',
'hg.usergroup.create.true', 'hg.repogroup.create.true',
'hg.fork.repository', 'hg.inherit_default_perms.false', False,
True),
('', '', '', '', '', '', True, False),
])
def test_global_perms_on_user(
self, repo_create, repo_create_write, user_group_create,
repo_group_create, fork_create, expect_error, expect_form_error,
inherit_default_permissions):
self.log_user()
user = fixture.create_user('dummy')
uid = user.user_id
# ENABLE REPO CREATE ON A GROUP
perm_params = {
'inherit_default_permissions': False,
'default_repo_create': repo_create,
'default_repo_create_on_write': repo_create_write,
'default_user_group_create': user_group_create,
'default_repo_group_create': repo_group_create,
'default_fork_create': fork_create,
'default_inherit_default_permissions': inherit_default_permissions,
'_method': 'put',
'csrf_token': self.csrf_token,
}
response = self.app.post(
url('edit_user_global_perms', user_id=uid),
params=perm_params)
if expect_form_error:
assert response.status_int == 200
response.mustcontain('Value must be one of')
else:
if expect_error:
msg = 'An error occurred during permissions saving'
else:
msg = 'User global permissions updated successfully'
ug = User.get(uid)
del perm_params['_method']
del perm_params['inherit_default_permissions']
del perm_params['csrf_token']
assert perm_params == ug.get_default_perms()
assert_session_flash(response, msg)
fixture.destroy_user(uid)
def test_global_permissions_initial_values(self, user_util):
self.log_user()
user = user_util.create_user()
uid = user.user_id
response = self.app.get(url('edit_user_global_perms', user_id=uid))
default_user = User.get_default_user()
default_permissions = default_user.get_default_perms()
assert_response = AssertResponse(response)
expected_permissions = (
'default_repo_create', 'default_repo_create_on_write',
'default_fork_create', 'default_repo_group_create',
'default_user_group_create', 'default_inherit_default_permissions')
for permission in expected_permissions:
css_selector = '[name={}][checked=checked]'.format(permission)
element = assert_response.get_element(css_selector)
assert element.value == default_permissions[permission]
def test_ips(self):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
response = self.app.get(url('edit_user_ips', user_id=user.user_id))
response.mustcontain('All IP addresses are allowed')
@pytest.mark.parametrize("test_name, ip, ip_range, failure", [
('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
('127_bad_ip', 'foobar', 'foobar', True),
])
def test_add_ip(self, test_name, ip, ip_range, failure):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
user_id = user.user_id
response = self.app.post(url('edit_user_ips', user_id=user_id),
params={'new_ip': ip, '_method': 'put',
'csrf_token': self.csrf_token})
if failure:
assert_session_flash(
response, 'Please enter a valid IPv4 or IpV6 address')
response = self.app.get(url('edit_user_ips', user_id=user_id))
response.mustcontain(no=[ip])
response.mustcontain(no=[ip_range])
else:
response = self.app.get(url('edit_user_ips', user_id=user_id))
response.mustcontain(ip)
response.mustcontain(ip_range)
# cleanup
for del_ip in UserIpMap.query().filter(
UserIpMap.user_id == user_id).all():
Session().delete(del_ip)
Session().commit()
def test_delete_ip(self):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
user_id = user.user_id
ip = '127.0.0.1/32'
ip_range = '127.0.0.1 - 127.0.0.1'
new_ip = UserModel().add_extra_ip(user_id, ip)
Session().commit()
new_ip_id = new_ip.ip_id
response = self.app.get(url('edit_user_ips', user_id=user_id))
response.mustcontain(ip)
response.mustcontain(ip_range)
self.app.post(url('edit_user_ips', user_id=user_id),
params={'_method': 'delete', 'del_ip_id': new_ip_id,
'csrf_token': self.csrf_token})
response = self.app.get(url('edit_user_ips', user_id=user_id))
response.mustcontain('All IP addresses are allowed')
response.mustcontain(no=[ip])
response.mustcontain(no=[ip_range])
def test_auth_tokens(self):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
response = self.app.get(
url('edit_user_auth_tokens', user_id=user.user_id))
response.mustcontain(user.api_key)
response.mustcontain('expires: never')
@pytest.mark.parametrize("desc, lifetime", [
('forever', -1),
('5mins', 60*5),
('30days', 60*60*24*30),
])
def test_add_auth_token(self, desc, lifetime):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
user_id = user.user_id
response = self.app.post(
url('edit_user_auth_tokens', user_id=user_id),
{'_method': 'put', 'description': desc, 'lifetime': lifetime,
'csrf_token': self.csrf_token})
assert_session_flash(response, 'Auth token successfully created')
try:
response = response.follow()
user = User.get(user_id)
for auth_token in user.auth_tokens:
response.mustcontain(auth_token)
finally:
for api_key in UserApiKeys.query().filter(
UserApiKeys.user_id == user_id).all():
Session().delete(api_key)
Session().commit()
def test_remove_auth_token(self):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
user_id = user.user_id
response = self.app.post(
url('edit_user_auth_tokens', user_id=user_id),
{'_method': 'put', 'description': 'desc', 'lifetime': -1,
'csrf_token': self.csrf_token})
assert_session_flash(response, 'Auth token successfully created')
response = response.follow()
# now delete our key
keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
assert 1 == len(keys)
response = self.app.post(
url('edit_user_auth_tokens', user_id=user_id),
{'_method': 'delete', 'del_auth_token': keys[0].api_key,
'csrf_token': self.csrf_token})
assert_session_flash(response, 'Auth token successfully deleted')
keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
assert 0 == len(keys)
def test_reset_main_auth_token(self):
self.log_user()
user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
user_id = user.user_id
api_key = user.api_key
response = self.app.get(url('edit_user_auth_tokens', user_id=user_id))
response.mustcontain(api_key)
response.mustcontain('expires: never')
response = self.app.post(
url('edit_user_auth_tokens', user_id=user_id),
{'_method': 'delete', 'del_auth_token_builtin': api_key,
'csrf_token': self.csrf_token})
assert_session_flash(response, 'Auth token successfully reset')
response = response.follow()
response.mustcontain(no=[api_key])