# -*- coding: utf-8 -*- # Copyright (C) 2010-2016 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import pytest from rhodecode.lib import helpers as h from rhodecode.model.db import User, UserFollowing, Repository, UserApiKeys from rhodecode.model.meta import Session from rhodecode.tests import ( TestController, url, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL, assert_session_flash) from rhodecode.tests.fixture import Fixture from rhodecode.tests.utils import AssertResponse fixture = Fixture() class TestMyAccountController(TestController): test_user_1 = 'testme' destroy_users = set() @classmethod def teardown_class(cls): fixture.destroy_users(cls.destroy_users) def test_my_account(self): self.log_user() response = self.app.get(url('my_account')) response.mustcontain('test_admin') response.mustcontain('href="/_admin/my_account/edit"') def test_logout_form_contains_csrf(self, autologin_user, csrf_token): response = self.app.get(url('my_account')) assert_response = AssertResponse(response) element = assert_response.get_element('.logout #csrf_token') assert element.value == csrf_token def test_my_account_edit(self): self.log_user() response = self.app.get(url('my_account_edit')) response.mustcontain('value="test_admin') def test_my_account_my_repos(self): self.log_user() response = self.app.get(url('my_account_repos')) repos = Repository.query().filter( Repository.user == User.get_by_username( TEST_USER_ADMIN_LOGIN)).all() for repo in repos: response.mustcontain('"name_raw": "%s"' % repo.repo_name) def test_my_account_my_watched(self): self.log_user() response = self.app.get(url('my_account_watched')) repos = UserFollowing.query().filter( UserFollowing.user == User.get_by_username( TEST_USER_ADMIN_LOGIN)).all() for repo in repos: response.mustcontain( '"name_raw": "%s"' % repo.follows_repository.repo_name) @pytest.mark.backends("git", "hg") def test_my_account_my_pullrequests(self, pr_util): self.log_user() response = self.app.get(url('my_account_pullrequests')) response.mustcontain('You currently have no open pull requests.') pr = pr_util.create_pull_request() response = self.app.get(url('my_account_pullrequests')) response.mustcontain('Pull request #%d opened' % pr.pull_request_id) def test_my_account_my_emails(self): self.log_user() response = self.app.get(url('my_account_emails')) response.mustcontain('No additional emails specified') def test_my_account_my_emails_add_existing_email(self): self.log_user() response = self.app.get(url('my_account_emails')) response.mustcontain('No additional emails specified') response = self.app.post(url('my_account_emails'), {'new_email': TEST_USER_REGULAR_EMAIL, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'This e-mail address is already taken') def test_my_account_my_emails_add_mising_email_in_form(self): self.log_user() response = self.app.get(url('my_account_emails')) response.mustcontain('No additional emails specified') response = self.app.post(url('my_account_emails'), {'csrf_token': self.csrf_token}) assert_session_flash(response, 'Please enter an email address') def test_my_account_my_emails_add_remove(self): self.log_user() response = self.app.get(url('my_account_emails')) response.mustcontain('No additional emails specified') response = self.app.post(url('my_account_emails'), {'new_email': 'foo@barz.com', 'csrf_token': self.csrf_token}) response = self.app.get(url('my_account_emails')) from rhodecode.model.db import UserEmailMap email_id = UserEmailMap.query().filter( UserEmailMap.user == User.get_by_username( TEST_USER_ADMIN_LOGIN)).filter( UserEmailMap.email == 'foo@barz.com').one().email_id response.mustcontain('foo@barz.com') response.mustcontain('<input id="del_email_id" name="del_email_id" ' 'type="hidden" value="%s" />' % email_id) response = self.app.post( url('my_account_emails'), { 'del_email_id': email_id, '_method': 'delete', 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Removed email address from user account') response = self.app.get(url('my_account_emails')) response.mustcontain('No additional emails specified') @pytest.mark.parametrize( "name, attrs", [ ('firstname', {'firstname': 'new_username'}), ('lastname', {'lastname': 'new_username'}), ('admin', {'admin': True}), ('admin', {'admin': False}), ('extern_type', {'extern_type': 'ldap'}), ('extern_type', {'extern_type': None}), # ('extern_name', {'extern_name': 'test'}), # ('extern_name', {'extern_name': None}), ('active', {'active': False}), ('active', {'active': True}), ('email', {'email': 'some@email.com'}), ]) def test_my_account_update(self, name, attrs): usr = fixture.create_user(self.test_user_1, password='qweqwe', email='testme@rhodecode.org', extern_type='rhodecode', extern_name=self.test_user_1, skip_if_exists=True) self.destroy_users.add(self.test_user_1) params = usr.get_api_data() # current user data user_id = usr.user_id self.log_user(username=self.test_user_1, password='qweqwe') params.update({'password_confirmation': ''}) params.update({'new_password': ''}) params.update({'extern_type': 'rhodecode'}) params.update({'extern_name': self.test_user_1}) params.update({'csrf_token': self.csrf_token}) params.update(attrs) # my account page cannot set language param yet, only for admins del params['language'] response = self.app.post(url('my_account'), params) assert_session_flash( response, 'Your account was updated successfully') del params['csrf_token'] updated_user = User.get_by_username(self.test_user_1) updated_params = updated_user.get_api_data() updated_params.update({'password_confirmation': ''}) updated_params.update({'new_password': ''}) params['last_login'] = updated_params['last_login'] # my account page cannot set language param yet, only for admins # but we get this info from API anyway params['language'] = updated_params['language'] if name == 'email': params['emails'] = [attrs['email']] if name == 'extern_type': # cannot update this via form, expected value is original one params['extern_type'] = "rhodecode" if name == 'extern_name': # cannot update this via form, expected value is original one params['extern_name'] = str(user_id) if name == 'active': # my account cannot deactivate account params['active'] = True if name == 'admin': # my account cannot make you an admin ! params['admin'] = False assert params == updated_params def test_my_account_update_err_email_exists(self): self.log_user() new_email = 'test_regular@mail.com' # already exisitn email response = self.app.post(url('my_account'), params={ 'username': 'test_admin', 'new_password': 'test12', 'password_confirmation': 'test122', 'firstname': 'NewName', 'lastname': 'NewLastname', 'email': new_email, 'csrf_token': self.csrf_token, }) response.mustcontain('This e-mail address is already taken') def test_my_account_update_err(self): self.log_user('test_regular2', 'test12') new_email = 'newmail.pl' response = self.app.post(url('my_account'), params={ 'username': 'test_admin', 'new_password': 'test12', 'password_confirmation': 'test122', 'firstname': 'NewName', 'lastname': 'NewLastname', 'email': new_email, 'csrf_token': self.csrf_token, }) response.mustcontain('An email address must contain a single @') from rhodecode.model import validators msg = validators.ValidUsername( edit=False, old_data={})._messages['username_exists'] msg = h.html_escape(msg % {'username': 'test_admin'}) response.mustcontain(u"%s" % msg) def test_my_account_auth_tokens(self): usr = self.log_user('test_regular2', 'test12') user = User.get(usr['user_id']) response = self.app.get(url('my_account_auth_tokens')) response.mustcontain(user.api_key) response.mustcontain('expires: never') @pytest.mark.parametrize("desc, lifetime", [ ('forever', -1), ('5mins', 60*5), ('30days', 60*60*24*30), ]) def test_my_account_add_auth_tokens(self, desc, lifetime): usr = self.log_user('test_regular2', 'test12') user = User.get(usr['user_id']) response = self.app.post(url('my_account_auth_tokens'), {'description': desc, 'lifetime': lifetime, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully created') try: response = response.follow() user = User.get(usr['user_id']) for auth_token in user.auth_tokens: response.mustcontain(auth_token) finally: for auth_token in UserApiKeys.query().all(): Session().delete(auth_token) Session().commit() def test_my_account_remove_auth_token(self): # TODO: without this cleanup it fails when run with the whole # test suite, so there must be some interference with other tests. UserApiKeys.query().delete() usr = self.log_user('test_regular2', 'test12') User.get(usr['user_id']) response = self.app.post(url('my_account_auth_tokens'), {'description': 'desc', 'lifetime': -1, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully created') response = response.follow() # now delete our key keys = UserApiKeys.query().all() assert 1 == len(keys) response = self.app.post( url('my_account_auth_tokens'), {'_method': 'delete', 'del_auth_token': keys[0].api_key, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully deleted') keys = UserApiKeys.query().all() assert 0 == len(keys) def test_my_account_reset_main_auth_token(self): usr = self.log_user('test_regular2', 'test12') user = User.get(usr['user_id']) api_key = user.api_key response = self.app.get(url('my_account_auth_tokens')) response.mustcontain(api_key) response.mustcontain('expires: never') response = self.app.post( url('my_account_auth_tokens'), {'_method': 'delete', 'del_auth_token_builtin': api_key, 'csrf_token': self.csrf_token}) assert_session_flash(response, 'Auth token successfully reset') response = response.follow() response.mustcontain(no=[api_key]) def test_password_is_updated_in_session_on_password_change( self, user_util): old_password = 'abcdef123' new_password = 'abcdef124' user = user_util.create_user(password=old_password) session = self.log_user(user.username, old_password) old_password_hash = session['password'] form_data = { 'current_password': old_password, 'new_password': new_password, 'new_password_confirmation': new_password, 'csrf_token': self.csrf_token } self.app.post(url('my_account_password'), form_data) response = self.app.get(url('home')) new_password_hash = response.session['rhodecode_user']['password'] assert old_password_hash != new_password_hash