# -*- coding: utf-8 -*-

# Copyright (C) 2010-2016  RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/

import pytest
from sqlalchemy.orm.exc import NoResultFound

from rhodecode.lib.auth import check_password
from rhodecode.lib import helpers as h
from rhodecode.model import validators
from rhodecode.model.db import User, UserIpMap, UserApiKeys
from rhodecode.model.meta import Session
from rhodecode.model.user import UserModel
from rhodecode.tests import (
    TestController, url, link_to, TEST_USER_ADMIN_LOGIN,
    TEST_USER_REGULAR_LOGIN, assert_session_flash)
from rhodecode.tests.fixture import Fixture
from rhodecode.tests.utils import AssertResponse

fixture = Fixture()


class TestAdminUsersController(TestController):
    test_user_1 = 'testme'
    destroy_users = set()

    @classmethod
    def teardown_method(cls, method):
        fixture.destroy_users(cls.destroy_users)

    def test_index(self):
        self.log_user()
        self.app.get(url('users'))

    def test_create(self):
        self.log_user()
        username = 'newtestuser'
        password = 'test12'
        password_confirmation = password
        name = 'name'
        lastname = 'lastname'
        email = 'mail@mail.com'

        response = self.app.get(url('new_user'))

        response = self.app.post(url('users'), params={
            'username': username,
            'password': password,
            'password_confirmation': password_confirmation,
            'firstname': name,
            'active': True,
            'lastname': lastname,
            'extern_name': 'rhodecode',
            'extern_type': 'rhodecode',
            'email': email,
            'csrf_token': self.csrf_token,
        })
        user_link = link_to(
            username,
            url('edit_user', user_id=User.get_by_username(username).user_id))
        assert_session_flash(response, 'Created user %s' % (user_link,))
        self.destroy_users.add(username)

        new_user = User.query().filter(User.username == username).one()

        assert new_user.username == username
        assert check_password(password, new_user.password)
        assert new_user.name == name
        assert new_user.lastname == lastname
        assert new_user.email == email

        response.follow()
        response = response.follow()
        response.mustcontain(username)

    def test_create_err(self):
        self.log_user()
        username = 'new_user'
        password = ''
        name = 'name'
        lastname = 'lastname'
        email = 'errmail.com'

        response = self.app.get(url('new_user'))

        response = self.app.post(url('users'), params={
            'username': username,
            'password': password,
            'name': name,
            'active': False,
            'lastname': lastname,
            'email': email,
            'csrf_token': self.csrf_token,
        })

        msg = validators.ValidUsername(
            False, {})._messages['system_invalid_username']
        msg = h.html_escape(msg % {'username': 'new_user'})
        response.mustcontain('<span class="error-message">%s</span>' % msg)
        response.mustcontain(
            '<span class="error-message">Please enter a value</span>')
        response.mustcontain(
            '<span class="error-message">An email address must contain a'
            ' single @</span>')

        def get_user():
            Session().query(User).filter(User.username == username).one()

        with pytest.raises(NoResultFound):
            get_user()

    def test_new(self):
        self.log_user()
        self.app.get(url('new_user'))

    @pytest.mark.parametrize("name, attrs", [
        ('firstname', {'firstname': 'new_username'}),
        ('lastname', {'lastname': 'new_username'}),
        ('admin', {'admin': True}),
        ('admin', {'admin': False}),
        ('extern_type', {'extern_type': 'ldap'}),
        ('extern_type', {'extern_type': None}),
        ('extern_name', {'extern_name': 'test'}),
        ('extern_name', {'extern_name': None}),
        ('active', {'active': False}),
        ('active', {'active': True}),
        ('email', {'email': 'some@email.com'}),
        ('language', {'language': 'de'}),
        ('language', {'language': 'en'}),
        # ('new_password', {'new_password': 'foobar123',
        #                   'password_confirmation': 'foobar123'})
        ])
    def test_update(self, name, attrs):
        self.log_user()
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
                                  email='testme@rhodecode.org',
                                  extern_type='rhodecode',
                                  extern_name=self.test_user_1,
                                  skip_if_exists=True)
        Session().commit()
        self.destroy_users.add(self.test_user_1)
        params = usr.get_api_data()
        cur_lang = params['language'] or 'en'
        params.update({
            'password_confirmation': '',
            'new_password': '',
            'language': cur_lang,
            '_method': 'put',
            'csrf_token': self.csrf_token,
        })
        params.update({'new_password': ''})
        params.update(attrs)
        if name == 'email':
            params['emails'] = [attrs['email']]
        elif name == 'extern_type':
            # cannot update this via form, expected value is original one
            params['extern_type'] = "rhodecode"
        elif name == 'extern_name':
            # cannot update this via form, expected value is original one
            params['extern_name'] = self.test_user_1
            # special case since this user is not
            # logged in yet his data is not filled
            # so we use creation data

        response = self.app.post(url('user', user_id=usr.user_id), params)
        assert response.status_int == 302
        assert_session_flash(response, 'User updated successfully')

        updated_user = User.get_by_username(self.test_user_1)
        updated_params = updated_user.get_api_data()
        updated_params.update({'password_confirmation': ''})
        updated_params.update({'new_password': ''})

        del params['_method']
        del params['csrf_token']
        assert params == updated_params

    def test_update_and_migrate_password(
            self, autologin_user, real_crypto_backend):
        from rhodecode.lib import auth

        # create new user, with sha256 password
        temp_user = 'test_admin_sha256'
        user = fixture.create_user(temp_user)
        user.password = auth._RhodeCodeCryptoSha256().hash_create(
            b'test123')
        Session().add(user)
        Session().commit()
        self.destroy_users.add('test_admin_sha256')

        params = user.get_api_data()

        params.update({
            'password_confirmation': 'qweqwe123',
            'new_password': 'qweqwe123',
            'language': 'en',
            '_method': 'put',
            'csrf_token': autologin_user.csrf_token,
        })

        response = self.app.post(url('user', user_id=user.user_id), params)
        assert response.status_int == 302
        assert_session_flash(response, 'User updated successfully')

        # new password should be bcrypted, after log-in and transfer
        user = User.get_by_username(temp_user)
        assert user.password.startswith('$')

        updated_user = User.get_by_username(temp_user)
        updated_params = updated_user.get_api_data()
        updated_params.update({'password_confirmation': 'qweqwe123'})
        updated_params.update({'new_password': 'qweqwe123'})

        del params['_method']
        del params['csrf_token']
        assert params == updated_params

    def test_delete(self):
        self.log_user()
        username = 'newtestuserdeleteme'

        fixture.create_user(name=username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'csrf_token': self.csrf_token})

        assert_session_flash(response, 'Successfully deleted user')

    def test_delete_owner_of_repository(self):
        self.log_user()
        username = 'newtestuserdeleteme_repo_owner'
        obj_name = 'test_repo'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_repo(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'csrf_token': self.csrf_token})

        msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
              'Switch owners or remove those repositories:%s' % (username,
                                                                 obj_name)
        assert_session_flash(response, msg)
        fixture.destroy_repo(obj_name)

    def test_delete_owner_of_repository_detaching(self):
        self.log_user()
        username = 'newtestuserdeleteme_repo_owner_detach'
        obj_name = 'test_repo'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_repo(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'user_repos': 'detach',
                                         'csrf_token': self.csrf_token})

        msg = 'Detached 1 repositories'
        assert_session_flash(response, msg)
        fixture.destroy_repo(obj_name)

    def test_delete_owner_of_repository_deleting(self):
        self.log_user()
        username = 'newtestuserdeleteme_repo_owner_delete'
        obj_name = 'test_repo'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_repo(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'user_repos': 'delete',
                                         'csrf_token': self.csrf_token})

        msg = 'Deleted 1 repositories'
        assert_session_flash(response, msg)

    def test_delete_owner_of_repository_group(self):
        self.log_user()
        username = 'newtestuserdeleteme_repo_group_owner'
        obj_name = 'test_group'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_repo_group(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'csrf_token': self.csrf_token})

        msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
              'Switch owners or remove those repository groups:%s' % (username,
                                                                      obj_name)
        assert_session_flash(response, msg)
        fixture.destroy_repo_group(obj_name)

    def test_delete_owner_of_repository_group_detaching(self):
        self.log_user()
        username = 'newtestuserdeleteme_repo_group_owner_detach'
        obj_name = 'test_group'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_repo_group(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'user_repo_groups': 'delete',
                                         'csrf_token': self.csrf_token})

        msg = 'Deleted 1 repository groups'
        assert_session_flash(response, msg)

    def test_delete_owner_of_repository_group_deleting(self):
        self.log_user()
        username = 'newtestuserdeleteme_repo_group_owner_delete'
        obj_name = 'test_group'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_repo_group(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'user_repo_groups': 'detach',
                                         'csrf_token': self.csrf_token})

        msg = 'Detached 1 repository groups'
        assert_session_flash(response, msg)
        fixture.destroy_repo_group(obj_name)

    def test_delete_owner_of_user_group(self):
        self.log_user()
        username = 'newtestuserdeleteme_user_group_owner'
        obj_name = 'test_user_group'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_user_group(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'csrf_token': self.csrf_token})

        msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
              'Switch owners or remove those user groups:%s' % (username,
                                                                obj_name)
        assert_session_flash(response, msg)
        fixture.destroy_user_group(obj_name)

    def test_delete_owner_of_user_group_detaching(self):
        self.log_user()
        username = 'newtestuserdeleteme_user_group_owner_detaching'
        obj_name = 'test_user_group'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_user_group(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        try:
            response = self.app.post(url('user', user_id=new_user.user_id),
                                     params={'_method': 'delete',
                                             'user_user_groups': 'detach',
                                             'csrf_token': self.csrf_token})

            msg = 'Detached 1 user groups'
            assert_session_flash(response, msg)
        finally:
            fixture.destroy_user_group(obj_name)

    def test_delete_owner_of_user_group_deleting(self):
        self.log_user()
        username = 'newtestuserdeleteme_user_group_owner_deleting'
        obj_name = 'test_user_group'
        usr = fixture.create_user(name=username)
        self.destroy_users.add(username)
        fixture.create_user_group(obj_name, cur_user=usr.username)

        new_user = Session().query(User)\
            .filter(User.username == username).one()
        response = self.app.post(url('user', user_id=new_user.user_id),
                                 params={'_method': 'delete',
                                         'user_user_groups': 'delete',
                                         'csrf_token': self.csrf_token})

        msg = 'Deleted 1 user groups'
        assert_session_flash(response, msg)

    def test_show(self):
        self.app.get(url('user', user_id=1))

    def test_edit(self):
        self.log_user()
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
        self.app.get(url('edit_user', user_id=user.user_id))

    @pytest.mark.parametrize(
        'repo_create, repo_create_write, user_group_create, repo_group_create,'
        'fork_create, inherit_default_permissions, expect_error,'
        'expect_form_error', [
            ('hg.create.none', 'hg.create.write_on_repogroup.false',
             'hg.usergroup.create.false', 'hg.repogroup.create.false',
             'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
            ('hg.create.repository', 'hg.create.write_on_repogroup.false',
             'hg.usergroup.create.false', 'hg.repogroup.create.false',
             'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
            ('hg.create.repository', 'hg.create.write_on_repogroup.true',
             'hg.usergroup.create.true', 'hg.repogroup.create.true',
             'hg.fork.repository', 'hg.inherit_default_perms.false', False,
             False),
            ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
             'hg.usergroup.create.true', 'hg.repogroup.create.true',
             'hg.fork.repository', 'hg.inherit_default_perms.false', False,
             True),
            ('', '', '', '', '', '', True, False),
        ])
    def test_global_perms_on_user(
            self, repo_create, repo_create_write, user_group_create,
            repo_group_create, fork_create, expect_error, expect_form_error,
            inherit_default_permissions):
        self.log_user()
        user = fixture.create_user('dummy')
        uid = user.user_id

        # ENABLE REPO CREATE ON A GROUP
        perm_params = {
            'inherit_default_permissions': False,
            'default_repo_create': repo_create,
            'default_repo_create_on_write': repo_create_write,
            'default_user_group_create': user_group_create,
            'default_repo_group_create': repo_group_create,
            'default_fork_create': fork_create,
            'default_inherit_default_permissions': inherit_default_permissions,
            '_method': 'put',
            'csrf_token': self.csrf_token,
        }
        response = self.app.post(
            url('edit_user_global_perms', user_id=uid),
            params=perm_params)

        if expect_form_error:
            assert response.status_int == 200
            response.mustcontain('Value must be one of')
        else:
            if expect_error:
                msg = 'An error occurred during permissions saving'
            else:
                msg = 'User global permissions updated successfully'
                ug = User.get(uid)
                del perm_params['_method']
                del perm_params['inherit_default_permissions']
                del perm_params['csrf_token']
                assert perm_params == ug.get_default_perms()
            assert_session_flash(response, msg)
        fixture.destroy_user(uid)

    def test_global_permissions_initial_values(self, user_util):
        self.log_user()
        user = user_util.create_user()
        uid = user.user_id
        response = self.app.get(url('edit_user_global_perms', user_id=uid))
        default_user = User.get_default_user()
        default_permissions = default_user.get_default_perms()
        assert_response = AssertResponse(response)
        expected_permissions = (
            'default_repo_create', 'default_repo_create_on_write',
            'default_fork_create', 'default_repo_group_create',
            'default_user_group_create', 'default_inherit_default_permissions')
        for permission in expected_permissions:
            css_selector = '[name={}][checked=checked]'.format(permission)
            element = assert_response.get_element(css_selector)
            assert element.value == default_permissions[permission]

    def test_ips(self):
        self.log_user()
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        response = self.app.get(url('edit_user_ips', user_id=user.user_id))
        response.mustcontain('All IP addresses are allowed')

    @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
        ('127_bad_ip', 'foobar', 'foobar', True),
    ])
    def test_add_ip(self, test_name, ip, ip_range, failure):
        self.log_user()
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        user_id = user.user_id

        response = self.app.post(url('edit_user_ips', user_id=user_id),
                                 params={'new_ip': ip, '_method': 'put',
                                         'csrf_token': self.csrf_token})

        if failure:
            assert_session_flash(
                response, 'Please enter a valid IPv4 or IpV6 address')
            response = self.app.get(url('edit_user_ips', user_id=user_id))
            response.mustcontain(no=[ip])
            response.mustcontain(no=[ip_range])

        else:
            response = self.app.get(url('edit_user_ips', user_id=user_id))
            response.mustcontain(ip)
            response.mustcontain(ip_range)

        # cleanup
        for del_ip in UserIpMap.query().filter(
                UserIpMap.user_id == user_id).all():
            Session().delete(del_ip)
            Session().commit()

    def test_delete_ip(self):
        self.log_user()
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        user_id = user.user_id
        ip = '127.0.0.1/32'
        ip_range = '127.0.0.1 - 127.0.0.1'
        new_ip = UserModel().add_extra_ip(user_id, ip)
        Session().commit()
        new_ip_id = new_ip.ip_id

        response = self.app.get(url('edit_user_ips', user_id=user_id))
        response.mustcontain(ip)
        response.mustcontain(ip_range)

        self.app.post(url('edit_user_ips', user_id=user_id),
                      params={'_method': 'delete', 'del_ip_id': new_ip_id,
                              'csrf_token': self.csrf_token})

        response = self.app.get(url('edit_user_ips', user_id=user_id))
        response.mustcontain('All IP addresses are allowed')
        response.mustcontain(no=[ip])
        response.mustcontain(no=[ip_range])

    def test_auth_tokens(self):
        self.log_user()

        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        response = self.app.get(
            url('edit_user_auth_tokens', user_id=user.user_id))
        response.mustcontain(user.api_key)
        response.mustcontain('expires: never')

    @pytest.mark.parametrize("desc, lifetime", [
        ('forever', -1),
        ('5mins', 60*5),
        ('30days', 60*60*24*30),
    ])
    def test_add_auth_token(self, desc, lifetime):
        self.log_user()
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        user_id = user.user_id

        response = self.app.post(
            url('edit_user_auth_tokens', user_id=user_id),
            {'_method': 'put', 'description': desc, 'lifetime': lifetime,
             'csrf_token': self.csrf_token})
        assert_session_flash(response, 'Auth token successfully created')
        try:
            response = response.follow()
            user = User.get(user_id)
            for auth_token in user.auth_tokens:
                response.mustcontain(auth_token)
        finally:
            for api_key in UserApiKeys.query().filter(
                    UserApiKeys.user_id == user_id).all():
                Session().delete(api_key)
                Session().commit()

    def test_remove_auth_token(self):
        self.log_user()
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        user_id = user.user_id

        response = self.app.post(
            url('edit_user_auth_tokens', user_id=user_id),
            {'_method': 'put', 'description': 'desc', 'lifetime': -1,
             'csrf_token': self.csrf_token})
        assert_session_flash(response, 'Auth token successfully created')
        response = response.follow()

        # now delete our key
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
        assert 1 == len(keys)

        response = self.app.post(
            url('edit_user_auth_tokens', user_id=user_id),
            {'_method': 'delete', 'del_auth_token': keys[0].api_key,
             'csrf_token': self.csrf_token})
        assert_session_flash(response, 'Auth token successfully deleted')
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
        assert 0 == len(keys)

    def test_reset_main_auth_token(self):
        self.log_user()
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
        user_id = user.user_id
        api_key = user.api_key
        response = self.app.get(url('edit_user_auth_tokens', user_id=user_id))
        response.mustcontain(api_key)
        response.mustcontain('expires: never')

        response = self.app.post(
            url('edit_user_auth_tokens', user_id=user_id),
            {'_method': 'delete', 'del_auth_token_builtin': api_key,
             'csrf_token': self.csrf_token})
        assert_session_flash(response, 'Auth token successfully reset')
        response = response.follow()
        response.mustcontain(no=[api_key])