# HG changeset patch # User Marcin Kuzminski # Date 2017-02-26 20:28:09 # Node ID 9ea7077dd424a763ca8711244d73147b7d3a4d07 # Parent 337b0a059dafb0bbf2c3c1a6f66db804f660e60a password-reset: strengthten security on password reset logic. - generate token that has special password reset role - set 10 minut expiration on the token - add some sleep to prevent bruteforcing attacks - use implicit messages to prevent user email discovery attacks diff --git a/rhodecode/login/views.py b/rhodecode/login/views.py --- a/rhodecode/login/views.py +++ b/rhodecode/login/views.py @@ -18,6 +18,7 @@ # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ +import time import collections import datetime import formencode @@ -37,9 +38,10 @@ from rhodecode.lib.auth import ( from rhodecode.lib.base import get_ip_addr from rhodecode.lib.exceptions import UserCreationError from rhodecode.lib.utils2 import safe_str -from rhodecode.model.db import User +from rhodecode.model.db import User, UserApiKeys from rhodecode.model.forms import LoginForm, RegisterForm, PasswordResetForm from rhodecode.model.meta import Session +from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.settings import SettingsModel from rhodecode.model.user import UserModel from rhodecode.translation import _ @@ -289,17 +291,24 @@ class LoginView(object): 'errors': {}, } + # always send implicit message to prevent from discovery of + # matching emails + msg = _('If such email exists, a password reset link was sent to it.') + if self.request.POST: + if h.HasPermissionAny('hg.password_reset.disabled')(): + _email = self.request.POST.get('email', '') + log.error('Failed attempt to reset password for `%s`.', _email) + self.session.flash(_('Password reset has been disabled.'), + queue='error') + return HTTPFound(self.request.route_path('reset_password')) + password_reset_form = PasswordResetForm()() try: form_result = password_reset_form.to_python( self.request.params) - if h.HasPermissionAny('hg.password_reset.disabled')(): - log.error('Failed attempt to reset password for %s.', form_result['email'] ) - self.session.flash( - _('Password reset has been disabled.'), - queue='error') - return HTTPFound(self.request.route_path('reset_password')) + user_email = form_result['email'] + if captcha.active: response = submit( self.request.params.get('recaptcha_challenge_field'), @@ -310,43 +319,66 @@ class LoginView(object): _value = form_result _msg = _('Bad captcha') error_dict = {'recaptcha_field': _msg} - raise formencode.Invalid(_msg, _value, None, - error_dict=error_dict) + raise formencode.Invalid( + _msg, _value, None, error_dict=error_dict) + # Generate reset URL and send mail. + user = User.get_by_email(user_email) - # Generate reset URL and send mail. - user_email = form_result['email'] - user = User.get_by_email(user_email) + # generate password reset token that expires in 10minutes + desc = 'Generated token for password reset from {}'.format( + datetime.datetime.now().isoformat()) + reset_token = AuthTokenModel().create( + user, lifetime=10, + description=desc, + role=UserApiKeys.ROLE_PASSWORD_RESET) + Session().commit() + + log.debug('Successfully created password recovery token') password_reset_url = self.request.route_url( 'reset_password_confirmation', - _query={'key': user.api_key}) + _query={'key': reset_token.api_key}) UserModel().reset_password_link( form_result, password_reset_url) - # Display success message and redirect. - self.session.flash( - _('Your password reset link was sent'), - queue='success') - return HTTPFound(self.request.route_path('login')) + self.session.flash(msg, queue='success') + return HTTPFound(self.request.route_path('reset_password')) except formencode.Invalid as errors: render_ctx.update({ 'defaults': errors.value, - 'errors': errors.error_dict, }) + log.debug('faking response on invalid password reset') + # make this take 2s, to prevent brute forcing. + time.sleep(2) + self.session.flash(msg, queue='success') + return HTTPFound(self.request.route_path('reset_password')) return render_ctx @view_config(route_name='reset_password_confirmation', request_method='GET') def password_reset_confirmation(self): + if self.request.GET and self.request.GET.get('key'): + # make this take 2s, to prevent brute forcing. + time.sleep(2) + + token = AuthTokenModel().get_auth_token( + self.request.GET.get('key')) + + # verify token is the correct role + if token is None or token.role != UserApiKeys.ROLE_PASSWORD_RESET: + log.debug('Got token with role:%s expected is %s', + getattr(token, 'role', 'EMPTY_TOKEN'), + UserApiKeys.ROLE_PASSWORD_RESET) + self.session.flash( + _('Given reset token is invalid'), queue='error') + return HTTPFound(self.request.route_path('reset_password')) + try: - user = User.get_by_auth_token(self.request.GET.get('key')) - password_reset_url = self.request.route_url( - 'reset_password_confirmation', - _query={'key': user.api_key}) - data = {'email': user.email} - UserModel().reset_password(data, password_reset_url) + owner = token.user + data = {'email': owner.email, 'token': token.api_key} + UserModel().reset_password(data) self.session.flash( _('Your password reset was successful, ' 'a new password has been sent to your email'), diff --git a/rhodecode/model/auth_token.py b/rhodecode/model/auth_token.py --- a/rhodecode/model/auth_token.py +++ b/rhodecode/model/auth_token.py @@ -41,7 +41,7 @@ class AuthTokenModel(BaseModel): """ :param user: user or user_id :param description: description of ApiKey - :param lifetime: expiration time in seconds + :param lifetime: expiration time in minutes :param role: role for the apikey """ from rhodecode.lib.auth import generate_auth_token @@ -85,3 +85,13 @@ class AuthTokenModel(BaseModel): .filter(or_(UserApiKeys.expires == -1, UserApiKeys.expires >= time.time())) return user_auth_tokens + + def get_auth_token(self, auth_token): + auth_token = UserApiKeys.query().filter( + UserApiKeys.api_key == auth_token) + auth_token = auth_token \ + .filter(or_(UserApiKeys.expires == -1, + UserApiKeys.expires >= time.time()))\ + .first() + + return auth_token diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -943,6 +943,8 @@ class UserApiKeys(Base, BaseModel): ROLE_VCS = 'token_role_vcs' ROLE_API = 'token_role_api' ROLE_FEED = 'token_role_feed' + ROLE_PASSWORD_RESET = 'token_password_reset' + ROLES = [ROLE_ALL, ROLE_HTTP, ROLE_VCS, ROLE_API, ROLE_FEED] user_api_key_id = Column("user_api_key_id", Integer(), nullable=False, unique=True, default=None, primary_key=True) diff --git a/rhodecode/model/user.py b/rhodecode/model/user.py --- a/rhodecode/model/user.py +++ b/rhodecode/model/user.py @@ -538,7 +538,7 @@ class UserModel(BaseModel): return True - def reset_password(self, data, pwd_reset_url): + def reset_password(self, data): from rhodecode.lib.celerylib import tasks, run_task from rhodecode.model.notification import EmailNotificationModel from rhodecode.lib import auth @@ -554,8 +554,15 @@ class UserModel(BaseModel): user.update_userdata(force_password_change=True) Session().add(user) + + # now delete the token in question + UserApiKeys = AuthTokenModel.cls + UserApiKeys().query().filter( + UserApiKeys.api_key == data['token']).delete() + Session().commit() log.info('successfully reset password for `%s`', user_email) + if new_passwd is None: raise Exception('unable to generate new password') @@ -563,7 +570,6 @@ class UserModel(BaseModel): email_kwargs = { 'new_password': new_passwd, - 'password_reset_url': pwd_reset_url, 'user': user, 'email': user_email, 'date': datetime.datetime.now() @@ -571,7 +577,8 @@ class UserModel(BaseModel): (subject, headers, email_body, email_body_plaintext) = EmailNotificationModel().render_email( - EmailNotificationModel.TYPE_PASSWORD_RESET_CONFIRMATION, **email_kwargs) + EmailNotificationModel.TYPE_PASSWORD_RESET_CONFIRMATION, + **email_kwargs) recipients = [user_email] diff --git a/rhodecode/templates/email_templates/password_reset.mako b/rhodecode/templates/email_templates/password_reset.mako --- a/rhodecode/templates/email_templates/password_reset.mako +++ b/rhodecode/templates/email_templates/password_reset.mako @@ -16,6 +16,7 @@ There was a request to reset your passwo You can continue, and generate new password by clicking following URL: ${password_reset_url} +This link will be active for 10 minutes. ${self.plaintext_footer()} @@ -28,4 +29,5 @@ There was a request to reset your passwo If you did not request a password reset, please contact your RhodeCode administrator.

${_('Generate new password here')}. +This link will be active for 10 minutes.

diff --git a/rhodecode/templates/email_templates/password_reset_confirmation.mako b/rhodecode/templates/email_templates/password_reset_confirmation.mako --- a/rhodecode/templates/email_templates/password_reset_confirmation.mako +++ b/rhodecode/templates/email_templates/password_reset_confirmation.mako @@ -9,12 +9,11 @@ Your new RhodeCode password <%def name="body_plaintext()" filter="n,trim"> Hi ${user.username}, -There was a request to reset your password using the email address ${email} on ${h.format_date(date)} +Below is your new access password for RhodeCode. *If you didn't do this, please contact your RhodeCode administrator.* -You can continue, and generate new password by clicking following URL: -${password_reset_url} +password: ${new_password} ${self.plaintext_footer()} @@ -27,4 +26,4 @@ Below is your new access password for Rh
If you didn't request a new password, please contact your RhodeCode administrator.

-

password:

+

password:

${new_password}
diff --git a/rhodecode/tests/functional/test_login.py b/rhodecode/tests/functional/test_login.py --- a/rhodecode/tests/functional/test_login.py +++ b/rhodecode/tests/functional/test_login.py @@ -25,15 +25,13 @@ import pytest from rhodecode.config.routing import ADMIN_PREFIX from rhodecode.tests import ( - TestController, assert_session_flash, clear_all_caches, url, - HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) + assert_session_flash, url, HG_REPO, TEST_USER_ADMIN_LOGIN) from rhodecode.tests.fixture import Fixture from rhodecode.tests.utils import AssertResponse, get_session_from_response -from rhodecode.lib.auth import check_password, generate_auth_token -from rhodecode.lib import helpers as h +from rhodecode.lib.auth import check_password from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model import validators -from rhodecode.model.db import User, Notification +from rhodecode.model.db import User, Notification, UserApiKeys from rhodecode.model.meta import Session fixture = Fixture() @@ -49,7 +47,7 @@ pwd_reset_confirm_url = ADMIN_PREFIX + ' @pytest.mark.usefixtures('app') -class TestLoginController: +class TestLoginController(object): destroy_users = set() @classmethod @@ -374,54 +372,42 @@ class TestLoginController: def test_forgot_password_wrong_mail(self): bad_email = 'marcin@wrongmail.org' response = self.app.post( - pwd_reset_url, - {'email': bad_email, } + pwd_reset_url, {'email': bad_email, } ) + assert_session_flash(response, + 'If such email exists, a password reset link was sent to it.') - msg = validators.ValidSystemEmail()._messages['non_existing_email'] - msg = h.html_escape(msg % {'email': bad_email}) - response.mustcontain() - - def test_forgot_password(self): + def test_forgot_password(self, user_util): response = self.app.get(pwd_reset_url) assert response.status == '200 OK' - username = 'test_password_reset_1' - password = 'qweqwe' - email = 'marcin@python-works.com' - name = 'passwd' - lastname = 'reset' + user = user_util.create_user() + user_id = user.user_id + email = user.email - new = User() - new.username = username - new.password = password - new.email = email - new.name = name - new.lastname = lastname - new.api_key = generate_auth_token(username) - Session().add(new) - Session().commit() + response = self.app.post(pwd_reset_url, {'email': email, }) - response = self.app.post(pwd_reset_url, - {'email': email, }) - - assert_session_flash( - response, 'Your password reset link was sent') - - response = response.follow() + assert_session_flash(response, + 'If such email exists, a password reset link was sent to it.') # BAD KEY - - key = "bad" - confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key) + confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, 'badkey') response = self.app.get(confirm_url) assert response.status == '302 Found' assert response.location.endswith(pwd_reset_url) + assert_session_flash(response, 'Given reset token is invalid') + + response.follow() # cleanup flash # GOOD KEY + key = UserApiKeys.query()\ + .filter(UserApiKeys.user_id == user_id)\ + .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\ + .first() - key = User.get_by_username(username).api_key - confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key) + assert key + + confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key.api_key) response = self.app.get(confirm_url) assert response.status == '302 Found' assert response.location.endswith(login_url) @@ -431,7 +417,7 @@ class TestLoginController: 'Your password reset was successful, ' 'a new password has been sent to your email') - response = response.follow() + response.follow() def _get_api_whitelist(self, values=None): config = {'api_access_controllers_whitelist': values or []} @@ -522,70 +508,3 @@ class TestLoginController: repo_name=HG_REPO, revision='tip', api_key=new_auth_token.api_key), status=302) - - -class TestPasswordReset(TestController): - - @pytest.mark.parametrize( - 'pwd_reset_setting, show_link, show_reset', [ - ('hg.password_reset.enabled', True, True), - ('hg.password_reset.hidden', False, True), - ('hg.password_reset.disabled', False, False), - ]) - def test_password_reset_settings( - self, pwd_reset_setting, show_link, show_reset): - clear_all_caches() - self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) - params = { - 'csrf_token': self.csrf_token, - 'anonymous': 'True', - 'default_register': 'hg.register.auto_activate', - 'default_register_message': '', - 'default_password_reset': pwd_reset_setting, - 'default_extern_activate': 'hg.extern_activate.auto', - } - resp = self.app.post(url('admin_permissions_application'), params=params) - self.logout_user() - - login_page = self.app.get(login_url) - asr_login = AssertResponse(login_page) - index_page = self.app.get(index_url) - asr_index = AssertResponse(index_page) - - if show_link: - asr_login.one_element_exists('a.pwd_reset') - asr_index.one_element_exists('a.pwd_reset') - else: - asr_login.no_element_exists('a.pwd_reset') - asr_index.no_element_exists('a.pwd_reset') - - pwdreset_page = self.app.get(pwd_reset_url) - - asr_reset = AssertResponse(pwdreset_page) - if show_reset: - assert 'Send password reset email' in pwdreset_page - asr_reset.one_element_exists('#email') - asr_reset.one_element_exists('#send') - else: - assert 'Password reset is disabled.' in pwdreset_page - asr_reset.no_element_exists('#email') - asr_reset.no_element_exists('#send') - - def test_password_form_disabled(self): - self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) - params = { - 'csrf_token': self.csrf_token, - 'anonymous': 'True', - 'default_register': 'hg.register.auto_activate', - 'default_register_message': '', - 'default_password_reset': 'hg.password_reset.disabled', - 'default_extern_activate': 'hg.extern_activate.auto', - } - self.app.post(url('admin_permissions_application'), params=params) - self.logout_user() - - pwdreset_page = self.app.post( - pwd_reset_url, - {'email': 'lisa@rhodecode.com',} - ) - assert 'Password reset is disabled.' in pwdreset_page diff --git a/rhodecode/tests/functional/test_password_reset.py b/rhodecode/tests/functional/test_password_reset.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/functional/test_password_reset.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2017 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import pytest + +from rhodecode.config.routing import ADMIN_PREFIX +from rhodecode.tests import ( + TestController, clear_all_caches, url, + TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) +from rhodecode.tests.fixture import Fixture +from rhodecode.tests.utils import AssertResponse + +fixture = Fixture() + +# Hardcode URLs because we don't have a request object to use +# pyramids URL generation methods. +index_url = '/' +login_url = ADMIN_PREFIX + '/login' +logut_url = ADMIN_PREFIX + '/logout' +register_url = ADMIN_PREFIX + '/register' +pwd_reset_url = ADMIN_PREFIX + '/password_reset' +pwd_reset_confirm_url = ADMIN_PREFIX + '/password_reset_confirmation' + + +class TestPasswordReset(TestController): + + @pytest.mark.parametrize( + 'pwd_reset_setting, show_link, show_reset', [ + ('hg.password_reset.enabled', True, True), + ('hg.password_reset.hidden', False, True), + ('hg.password_reset.disabled', False, False), + ]) + def test_password_reset_settings( + self, pwd_reset_setting, show_link, show_reset): + clear_all_caches() + self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) + params = { + 'csrf_token': self.csrf_token, + 'anonymous': 'True', + 'default_register': 'hg.register.auto_activate', + 'default_register_message': '', + 'default_password_reset': pwd_reset_setting, + 'default_extern_activate': 'hg.extern_activate.auto', + } + resp = self.app.post(url('admin_permissions_application'), params=params) + self.logout_user() + + login_page = self.app.get(login_url) + asr_login = AssertResponse(login_page) + index_page = self.app.get(index_url) + asr_index = AssertResponse(index_page) + + if show_link: + asr_login.one_element_exists('a.pwd_reset') + asr_index.one_element_exists('a.pwd_reset') + else: + asr_login.no_element_exists('a.pwd_reset') + asr_index.no_element_exists('a.pwd_reset') + + response = self.app.get(pwd_reset_url) + + assert_response = AssertResponse(response) + if show_reset: + response.mustcontain('Send password reset email') + assert_response.one_element_exists('#email') + assert_response.one_element_exists('#send') + else: + response.mustcontain('Password reset is disabled.') + assert_response.no_element_exists('#email') + assert_response.no_element_exists('#send') + + def test_password_form_disabled(self): + self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS) + params = { + 'csrf_token': self.csrf_token, + 'anonymous': 'True', + 'default_register': 'hg.register.auto_activate', + 'default_register_message': '', + 'default_password_reset': 'hg.password_reset.disabled', + 'default_extern_activate': 'hg.extern_activate.auto', + } + self.app.post(url('admin_permissions_application'), params=params) + self.logout_user() + + response = self.app.post( + pwd_reset_url, {'email': 'lisa@rhodecode.com',} + ) + response = response.follow() + response.mustcontain('Password reset is disabled.')