test_admin_users.py
419 lines
| 16.7 KiB
| text/x-python
|
PythonLexer
Bradley M. Kuhn
|
r4116 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
r2467 | from sqlalchemy.orm.exc import NoResultFound | |||
r691 | from rhodecode.tests import * | |||
Bradley M. Kuhn
|
r4116 | from rhodecode.tests.fixture import Fixture | ||
from rhodecode.model.db import User, Permission, UserIpMap, UserApiKeys | ||||
r691 | from rhodecode.lib.auth import check_password | |||
r1758 | from rhodecode.model.user import UserModel | |||
r2467 | from rhodecode.model import validators | |||
from rhodecode.lib import helpers as h | ||||
r2709 | from rhodecode.model.meta import Session | |||
r2467 | ||||
Bradley M. Kuhn
|
r4116 | fixture = Fixture() | ||
r691 | ||||
class TestAdminUsersController(TestController): | ||||
Bradley M. Kuhn
|
r4116 | test_user_1 = 'testme' | ||
@classmethod | ||||
def teardown_class(cls): | ||||
if User.get_by_username(cls.test_user_1): | ||||
UserModel().delete(cls.test_user_1) | ||||
Session().commit() | ||||
r691 | ||||
def test_index(self): | ||||
r1758 | self.log_user() | |||
r691 | response = self.app.get(url('users')) | |||
# Test response... | ||||
def test_create(self): | ||||
self.log_user() | ||||
username = 'newtestuser' | ||||
password = 'test12' | ||||
r1597 | password_confirmation = password | |||
r691 | name = 'name' | |||
lastname = 'lastname' | ||||
email = 'mail@mail.com' | ||||
r1818 | response = self.app.post(url('users'), | |||
Bradley M. Kuhn
|
r4116 | {'username': username, | ||
'password': password, | ||||
'password_confirmation': password_confirmation, | ||||
'firstname': name, | ||||
'active': True, | ||||
'lastname': lastname, | ||||
'extern_name': 'rhodecode', | ||||
'extern_type': 'rhodecode', | ||||
'email': email}) | ||||
r691 | ||||
Mads Kiilerich
|
r3565 | self.checkSessionFlash(response, '''Created user %s''' % (username)) | ||
r691 | ||||
r3797 | new_user = Session().query(User).\ | |||
r1758 | filter(User.username == username).one() | |||
r691 | ||||
r2467 | self.assertEqual(new_user.username, username) | |||
self.assertEqual(check_password(password, new_user.password), True) | ||||
self.assertEqual(new_user.name, name) | ||||
self.assertEqual(new_user.lastname, lastname) | ||||
self.assertEqual(new_user.email, email) | ||||
r691 | ||||
response.follow() | ||||
response = response.follow() | ||||
r2661 | response.mustcontain("""newtestuser""") | |||
r691 | ||||
def test_create_err(self): | ||||
self.log_user() | ||||
username = 'new_user' | ||||
password = '' | ||||
name = 'name' | ||||
lastname = 'lastname' | ||||
email = 'errmail.com' | ||||
r2467 | response = self.app.post(url('users'), {'username': username, | |||
'password': password, | ||||
'name': name, | ||||
'active': False, | ||||
'lastname': lastname, | ||||
'email': email}) | ||||
r691 | ||||
r2467 | msg = validators.ValidUsername(False, {})._messages['system_invalid_username'] | |||
msg = h.html_escape(msg % {'username': 'new_user'}) | ||||
response.mustcontain("""<span class="error-message">%s</span>""" % msg) | ||||
response.mustcontain("""<span class="error-message">Please enter a value</span>""") | ||||
response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""") | ||||
r691 | ||||
def get_user(): | ||||
r3797 | Session().query(User).filter(User.username == username).one() | |||
r691 | ||||
self.assertRaises(NoResultFound, get_user), 'found user in database' | ||||
def test_new(self): | ||||
r1758 | self.log_user() | |||
r691 | response = self.app.get(url('new_user')) | |||
Bradley M. Kuhn
|
r4116 | @parameterized.expand( | ||
[('firstname', {'firstname': 'new_username'}), | ||||
('lastname', {'lastname': 'new_username'}), | ||||
('admin', {'admin': True}), | ||||
('admin', {'admin': False}), | ||||
('extern_type', {'extern_type': 'ldap'}), | ||||
('extern_type', {'extern_type': None}), | ||||
('extern_name', {'extern_name': 'test'}), | ||||
('extern_name', {'extern_name': None}), | ||||
('active', {'active': False}), | ||||
('active', {'active': True}), | ||||
('email', {'email': 'some@email.com'}), | ||||
# ('new_password', {'new_password': 'foobar123', | ||||
# 'password_confirmation': 'foobar123'}) | ||||
]) | ||||
def test_update(self, name, attrs): | ||||
r2544 | self.log_user() | |||
Bradley M. Kuhn
|
r4116 | usr = fixture.create_user(self.test_user_1, password='qweqwe', | ||
email='testme@rhodecode.org', | ||||
extern_type='rhodecode', | ||||
extern_name=self.test_user_1, | ||||
skip_if_exists=True) | ||||
r3797 | Session().commit() | |||
r2544 | params = usr.get_api_data() | |||
params.update({'password_confirmation': ''}) | ||||
params.update({'new_password': ''}) | ||||
Bradley M. Kuhn
|
r4116 | params.update(attrs) | ||
r2544 | if name == 'email': | |||
Bradley M. Kuhn
|
r4116 | params['emails'] = [attrs['email']] | ||
if name == 'extern_type': | ||||
#cannot update this via form, expected value is original one | ||||
params['extern_type'] = "rhodecode" | ||||
if name == 'extern_name': | ||||
#cannot update this via form, expected value is original one | ||||
params['extern_name'] = self.test_user_1 | ||||
# special case since this user is not | ||||
# logged in yet his data is not filled | ||||
# so we use creation data | ||||
r2544 | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.put(url('user', id=usr.user_id), params) | ||
self.checkSessionFlash(response, 'User updated successfully') | ||||
r2544 | ||||
Bradley M. Kuhn
|
r4116 | updated_user = User.get_by_username(self.test_user_1) | ||
updated_params = updated_user.get_api_data() | ||||
updated_params.update({'password_confirmation': ''}) | ||||
updated_params.update({'new_password': ''}) | ||||
r2544 | ||||
Bradley M. Kuhn
|
r4116 | self.assertEqual(params, updated_params) | ||
r691 | ||||
def test_delete(self): | ||||
self.log_user() | ||||
username = 'newtestuserdeleteme' | ||||
Bradley M. Kuhn
|
r4116 | fixture.create_user(name=username) | ||
r691 | ||||
r3797 | new_user = Session().query(User)\ | |||
r1758 | .filter(User.username == username).one() | |||
r691 | response = self.app.delete(url('user', id=new_user.user_id)) | |||
r3640 | self.checkSessionFlash(response, 'Successfully deleted user') | |||
r691 | ||||
def test_show(self): | ||||
response = self.app.get(url('user', id=1)) | ||||
def test_edit(self): | ||||
r1758 | self.log_user() | |||
user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | ||||
response = self.app.get(url('edit_user', id=user.user_id)) | ||||
def test_add_perm_create_repo(self): | ||||
self.log_user() | ||||
perm_none = Permission.get_by_key('hg.create.none') | ||||
perm_create = Permission.get_by_key('hg.create.repository') | ||||
r2709 | user = UserModel().create_or_update(username='dummy', password='qwe', | |||
email='dummy', firstname='a', | ||||
lastname='b') | ||||
Session().commit() | ||||
uid = user.user_id | ||||
r1758 | ||||
r2709 | try: | |||
#User should have None permission on creation repository | ||||
self.assertEqual(UserModel().has_perm(user, perm_none), False) | ||||
self.assertEqual(UserModel().has_perm(user, perm_create), False) | ||||
r1758 | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.post(url('edit_user_perms', id=uid), | ||
r2709 | params=dict(_method='put', | |||
create_repo_perm=True)) | ||||
perm_none = Permission.get_by_key('hg.create.none') | ||||
perm_create = Permission.get_by_key('hg.create.repository') | ||||
r1758 | ||||
r2709 | #User should have None permission on creation repository | |||
self.assertEqual(UserModel().has_perm(uid, perm_none), False) | ||||
self.assertEqual(UserModel().has_perm(uid, perm_create), True) | ||||
finally: | ||||
UserModel().delete(uid) | ||||
Session().commit() | ||||
r1758 | ||||
def test_revoke_perm_create_repo(self): | ||||
self.log_user() | ||||
perm_none = Permission.get_by_key('hg.create.none') | ||||
perm_create = Permission.get_by_key('hg.create.repository') | ||||
r2709 | user = UserModel().create_or_update(username='dummy', password='qwe', | |||
email='dummy', firstname='a', | ||||
lastname='b') | ||||
Session().commit() | ||||
uid = user.user_id | ||||
try: | ||||
#User should have None permission on creation repository | ||||
self.assertEqual(UserModel().has_perm(user, perm_none), False) | ||||
self.assertEqual(UserModel().has_perm(user, perm_create), False) | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.post(url('edit_user_perms', id=uid), | ||
r2709 | params=dict(_method='put')) | |||
perm_none = Permission.get_by_key('hg.create.none') | ||||
perm_create = Permission.get_by_key('hg.create.repository') | ||||
r1758 | ||||
r2709 | #User should have None permission on creation repository | |||
self.assertEqual(UserModel().has_perm(uid, perm_none), True) | ||||
self.assertEqual(UserModel().has_perm(uid, perm_create), False) | ||||
finally: | ||||
UserModel().delete(uid) | ||||
Session().commit() | ||||
def test_add_perm_fork_repo(self): | ||||
self.log_user() | ||||
perm_none = Permission.get_by_key('hg.fork.none') | ||||
perm_fork = Permission.get_by_key('hg.fork.repository') | ||||
user = UserModel().create_or_update(username='dummy', password='qwe', | ||||
email='dummy', firstname='a', | ||||
lastname='b') | ||||
Session().commit() | ||||
uid = user.user_id | ||||
try: | ||||
#User should have None permission on creation repository | ||||
self.assertEqual(UserModel().has_perm(user, perm_none), False) | ||||
self.assertEqual(UserModel().has_perm(user, perm_fork), False) | ||||
r1758 | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.post(url('edit_user_perms', id=uid), | ||
r2709 | params=dict(_method='put', | |||
create_repo_perm=True)) | ||||
perm_none = Permission.get_by_key('hg.create.none') | ||||
perm_create = Permission.get_by_key('hg.create.repository') | ||||
#User should have None permission on creation repository | ||||
self.assertEqual(UserModel().has_perm(uid, perm_none), False) | ||||
self.assertEqual(UserModel().has_perm(uid, perm_create), True) | ||||
finally: | ||||
UserModel().delete(uid) | ||||
Session().commit() | ||||
def test_revoke_perm_fork_repo(self): | ||||
self.log_user() | ||||
perm_none = Permission.get_by_key('hg.fork.none') | ||||
perm_fork = Permission.get_by_key('hg.fork.repository') | ||||
r1758 | ||||
r2709 | user = UserModel().create_or_update(username='dummy', password='qwe', | |||
email='dummy', firstname='a', | ||||
lastname='b') | ||||
Session().commit() | ||||
uid = user.user_id | ||||
try: | ||||
#User should have None permission on creation repository | ||||
self.assertEqual(UserModel().has_perm(user, perm_none), False) | ||||
self.assertEqual(UserModel().has_perm(user, perm_fork), False) | ||||
r1758 | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.post(url('edit_user_perms', id=uid), | ||
r2709 | params=dict(_method='put')) | |||
perm_none = Permission.get_by_key('hg.create.none') | ||||
perm_create = Permission.get_by_key('hg.create.repository') | ||||
#User should have None permission on creation repository | ||||
self.assertEqual(UserModel().has_perm(uid, perm_none), True) | ||||
self.assertEqual(UserModel().has_perm(uid, perm_create), False) | ||||
finally: | ||||
UserModel().delete(uid) | ||||
Session().commit() | ||||
r691 | ||||
Bradley M. Kuhn
|
r4116 | def test_ips(self): | ||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
response = self.app.get(url('edit_user_ips', id=user.user_id)) | ||||
response.mustcontain('All IP addresses are allowed') | ||||
@parameterized.expand([ | ||||
('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False), | ||||
('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False), | ||||
('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False), | ||||
('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False), | ||||
('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True), | ||||
('127_bad_ip', 'foobar', 'foobar', True), | ||||
]) | ||||
def test_add_ip(self, test_name, ip, ip_range, failure): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
response = self.app.put(url('edit_user_ips', id=user_id), | ||||
params=dict(new_ip=ip)) | ||||
if failure: | ||||
self.checkSessionFlash(response, 'Please enter a valid IPv4 or IpV6 address') | ||||
response = self.app.get(url('edit_user_ips', id=user_id)) | ||||
response.mustcontain(no=[ip]) | ||||
response.mustcontain(no=[ip_range]) | ||||
else: | ||||
response = self.app.get(url('edit_user_ips', id=user_id)) | ||||
response.mustcontain(ip) | ||||
response.mustcontain(ip_range) | ||||
## cleanup | ||||
for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all(): | ||||
Session().delete(del_ip) | ||||
Session().commit() | ||||
def test_delete_ip(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
ip = '127.0.0.1/32' | ||||
ip_range = '127.0.0.1 - 127.0.0.1' | ||||
new_ip = UserModel().add_extra_ip(user_id, ip) | ||||
Session().commit() | ||||
new_ip_id = new_ip.ip_id | ||||
response = self.app.get(url('edit_user_ips', id=user_id)) | ||||
response.mustcontain(ip) | ||||
response.mustcontain(ip_range) | ||||
self.app.post(url('edit_user_ips', id=user_id), | ||||
params=dict(_method='delete', del_ip_id=new_ip_id)) | ||||
response = self.app.get(url('edit_user_ips', id=user_id)) | ||||
response.mustcontain('All IP addresses are allowed') | ||||
response.mustcontain(no=[ip]) | ||||
response.mustcontain(no=[ip_range]) | ||||
def test_api_keys(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
response = self.app.get(url('edit_user_api_keys', id=user.user_id)) | ||||
response.mustcontain(user.api_key) | ||||
response.mustcontain('expires: never') | ||||
@parameterized.expand([ | ||||
('forever', -1), | ||||
('5mins', 60*5), | ||||
('30days', 60*60*24*30), | ||||
]) | ||||
def test_add_api_keys(self, desc, lifetime): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
response = self.app.post(url('edit_user_api_keys', id=user_id), | ||||
{'_method': 'put', 'description': desc, 'lifetime': lifetime}) | ||||
self.checkSessionFlash(response, 'Api key successfully created') | ||||
try: | ||||
response = response.follow() | ||||
user = User.get(user_id) | ||||
for api_key in user.api_keys: | ||||
response.mustcontain(api_key) | ||||
finally: | ||||
for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all(): | ||||
Session().delete(api_key) | ||||
Session().commit() | ||||
def test_remove_api_key(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
response = self.app.post(url('edit_user_api_keys', id=user_id), | ||||
{'_method': 'put', 'description': 'desc', 'lifetime': -1}) | ||||
self.checkSessionFlash(response, 'Api key successfully created') | ||||
response = response.follow() | ||||
#now delete our key | ||||
keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() | ||||
self.assertEqual(1, len(keys)) | ||||
response = self.app.post(url('edit_user_api_keys', id=user_id), | ||||
{'_method': 'delete', 'del_api_key': keys[0].api_key}) | ||||
self.checkSessionFlash(response, 'Api key successfully deleted') | ||||
keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() | ||||
self.assertEqual(0, len(keys)) | ||||
def test_reset_main_api_key(self): | ||||
self.log_user() | ||||
user = User.get_by_username(TEST_USER_REGULAR_LOGIN) | ||||
user_id = user.user_id | ||||
api_key = user.api_key | ||||
response = self.app.get(url('edit_user_api_keys', id=user_id)) | ||||
response.mustcontain(api_key) | ||||
response.mustcontain('expires: never') | ||||
response = self.app.post(url('edit_user_api_keys', id=user_id), | ||||
{'_method': 'delete', 'del_api_key_builtin': api_key}) | ||||
self.checkSessionFlash(response, 'Api key successfully reset') | ||||
response = response.follow() | ||||
response.mustcontain(no=[api_key]) | ||||