# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import time import json import pyotp import qrcode import collections import datetime import formencode import formencode.htmlfill import logging import urllib.parse import requests from io import BytesIO from base64 import b64encode from pyramid.renderers import render from pyramid.response import Response from pyramid.httpexceptions import HTTPFound import rhodecode from rhodecode.apps._base import BaseAppView from rhodecode.authentication.base import authenticate, HTTP_TYPE from rhodecode.authentication.plugins import auth_rhodecode from rhodecode.events import UserRegistered, trigger from rhodecode.lib import helpers as h from rhodecode.lib import audit_logger from rhodecode.lib.auth import ( AuthUser, HasPermissionAnyDecorator, CSRFRequired, LoginRequired, NotAnonymous) from rhodecode.lib.base import get_ip_addr from rhodecode.lib.exceptions import UserCreationError from rhodecode.lib.utils2 import safe_str from rhodecode.model.db import User, UserApiKeys from rhodecode.model.forms import LoginForm, RegisterForm, PasswordResetForm, TOTPForm from rhodecode.model.meta import Session from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.settings import SettingsModel from rhodecode.model.user import UserModel from rhodecode.translation import _ log = logging.getLogger(__name__) CaptchaData = collections.namedtuple( 'CaptchaData', 'active, private_key, public_key') def store_user_in_session(session, user_identifier, remember=False): user = User.get_by_username_or_primary_email(user_identifier) auth_user = AuthUser(user.user_id) auth_user.set_authenticated() cs = auth_user.get_cookie_store() session['rhodecode_user'] = cs user.update_lastlogin() Session().commit() # If they want to be remembered, update the cookie if remember: _year = (datetime.datetime.now() + datetime.timedelta(seconds=60 * 60 * 24 * 365)) session._set_cookie_expires(_year) session.save() safe_cs = cs.copy() safe_cs['password'] = '****' log.info('user %s is now authenticated and stored in ' 'session, session attrs %s', user_identifier, safe_cs) # dumps session attrs back to cookie session._update_cookie_out() # we set new cookie headers = None if session.request['set_cookie']: # send set-cookie headers back to response to update cookie headers = [('Set-Cookie', session.request['cookie_out'])] return headers def get_came_from(request): came_from = safe_str(request.GET.get('came_from', '')) parsed = urllib.parse.urlparse(came_from) allowed_schemes = ['http', 'https'] default_came_from = h.route_path('home') if parsed.scheme and parsed.scheme not in allowed_schemes: log.error('Suspicious URL scheme detected %s for url %s', parsed.scheme, parsed) came_from = default_came_from elif parsed.netloc and request.host != parsed.netloc: log.error('Suspicious NETLOC detected %s for url %s server url ' 'is: %s', parsed.netloc, parsed, request.host) came_from = default_came_from elif any(bad_char in came_from for bad_char in ('\r', '\n')): log.error('Header injection detected `%s` for url %s server url ', parsed.path, parsed) came_from = default_came_from return came_from or default_came_from class LoginView(BaseAppView): def load_default_context(self): c = self._get_local_tmpl_context() c.came_from = get_came_from(self.request) return c def _get_captcha_data(self): settings = SettingsModel().get_all_settings() private_key = settings.get('rhodecode_captcha_private_key') public_key = settings.get('rhodecode_captcha_public_key') active = bool(private_key) return CaptchaData( active=active, private_key=private_key, public_key=public_key) def validate_captcha(self, private_key): captcha_rs = self.request.POST.get('g-recaptcha-response') url = "https://www.google.com/recaptcha/api/siteverify" params = { 'secret': private_key, 'response': captcha_rs, 'remoteip': get_ip_addr(self.request.environ) } verify_rs = requests.get(url, params=params, verify=True, timeout=60) verify_rs = verify_rs.json() captcha_status = verify_rs.get('success', False) captcha_errors = verify_rs.get('error-codes', []) if not isinstance(captcha_errors, list): captcha_errors = [captcha_errors] captcha_errors = ', '.join(captcha_errors) captcha_message = '' if captcha_status is False: captcha_message = "Bad captcha. Errors: {}".format( captcha_errors) return captcha_status, captcha_message def login(self): c = self.load_default_context() auth_user = self._rhodecode_user # redirect if already logged in if (auth_user.is_authenticated and not auth_user.is_default and auth_user.ip_allowed): raise HTTPFound(c.came_from) # check if we use headers plugin, and try to login using it. try: log.debug('Running PRE-AUTH for headers based authentication') auth_info = authenticate( '', '', self.request.environ, HTTP_TYPE, skip_missing=True) if auth_info: headers = store_user_in_session( self.session, auth_info.get('username')) raise HTTPFound(c.came_from, headers=headers) except UserCreationError as e: log.error(e) h.flash(e, category='error') return self._get_template_context(c) def login_post(self): c = self.load_default_context() login_form = LoginForm(self.request.translate)() try: self.session.invalidate() form_result = login_form.to_python(self.request.POST) # form checks for username/password, now we're authenticated username = form_result['username'] if (user := User.get_by_username_or_primary_email(username)).has_enabled_2fa: user.check_2fa_required = True headers = store_user_in_session( self.session, user_identifier=username, remember=form_result['remember']) log.debug('Redirecting to "%s" after login.', c.came_from) audit_user = audit_logger.UserWrap( username=self.request.POST.get('username'), ip_addr=self.request.remote_addr) action_data = {'user_agent': self.request.user_agent} audit_logger.store_web( 'user.login.success', action_data=action_data, user=audit_user, commit=True) raise HTTPFound(c.came_from, headers=headers) except formencode.Invalid as errors: defaults = errors.value # remove password from filling in form again defaults.pop('password', None) render_ctx = { 'errors': errors.error_dict, 'defaults': defaults, } audit_user = audit_logger.UserWrap( username=self.request.POST.get('username'), ip_addr=self.request.remote_addr) action_data = {'user_agent': self.request.user_agent} audit_logger.store_web( 'user.login.failure', action_data=action_data, user=audit_user, commit=True) return self._get_template_context(c, **render_ctx) except UserCreationError as e: # headers auth or other auth functions that create users on # the fly can throw this exception signaling that there's issue # with user creation, explanation should be provided in # Exception itself h.flash(e, category='error') return self._get_template_context(c) @CSRFRequired() def logout(self): auth_user = self._rhodecode_user log.info('Deleting session for user: `%s`', auth_user) action_data = {'user_agent': self.request.user_agent} audit_logger.store_web( 'user.logout', action_data=action_data, user=auth_user, commit=True) self.session.delete() return HTTPFound(h.route_path('home')) @HasPermissionAnyDecorator( 'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate') def register(self, defaults=None, errors=None): c = self.load_default_context() defaults = defaults or {} errors = errors or {} settings = SettingsModel().get_all_settings() register_message = settings.get('rhodecode_register_message') or '' captcha = self._get_captcha_data() auto_active = 'hg.register.auto_activate' in User.get_default_user()\ .AuthUser().permissions['global'] render_ctx = self._get_template_context(c) render_ctx.update({ 'defaults': defaults, 'errors': errors, 'auto_active': auto_active, 'captcha_active': captcha.active, 'captcha_public_key': captcha.public_key, 'register_message': register_message, }) return render_ctx @HasPermissionAnyDecorator( 'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate') def register_post(self): from rhodecode.authentication.plugins import auth_rhodecode self.load_default_context() captcha = self._get_captcha_data() auto_active = 'hg.register.auto_activate' in User.get_default_user()\ .AuthUser().permissions['global'] extern_name = auth_rhodecode.RhodeCodeAuthPlugin.uid extern_type = auth_rhodecode.RhodeCodeAuthPlugin.uid register_form = RegisterForm(self.request.translate)() try: form_result = register_form.to_python(self.request.POST) form_result['active'] = auto_active external_identity = self.request.POST.get('external_identity') if external_identity: extern_name = external_identity extern_type = external_identity if captcha.active: captcha_status, captcha_message = self.validate_captcha( captcha.private_key) if not captcha_status: _value = form_result _msg = _('Bad captcha') error_dict = {'recaptcha_field': captcha_message} raise formencode.Invalid( _msg, _value, None, error_dict=error_dict) new_user = UserModel().create_registration( form_result, extern_name=extern_name, extern_type=extern_type) action_data = {'data': new_user.get_api_data(), 'user_agent': self.request.user_agent} if external_identity: action_data['external_identity'] = external_identity audit_user = audit_logger.UserWrap( username=new_user.username, user_id=new_user.user_id, ip_addr=self.request.remote_addr) audit_logger.store_web( 'user.register', action_data=action_data, user=audit_user) event = UserRegistered(user=new_user, session=self.session) trigger(event) h.flash( _('You have successfully registered with RhodeCode. You can log-in now.'), category='success') if external_identity: h.flash( _('Please use the {identity} button to log-in').format( identity=external_identity), category='success') Session().commit() redirect_ro = self.request.route_path('login') raise HTTPFound(redirect_ro) except formencode.Invalid as errors: errors.value.pop('password', None) errors.value.pop('password_confirmation', None) return self.register( defaults=errors.value, errors=errors.error_dict) except UserCreationError as e: # container auth or other auth functions that create users on # the fly can throw this exception signaling that there's issue # with user creation, explanation should be provided in # Exception itself h.flash(e, category='error') return self.register() def password_reset(self): c = self.load_default_context() captcha = self._get_captcha_data() template_context = { 'captcha_active': captcha.active, 'captcha_public_key': captcha.public_key, 'defaults': {}, 'errors': {}, } # always send implicit message to prevent from discovery of # matching emails msg = _('If such email exists, a password reset link was sent to it.') def default_response(): log.debug('faking response on invalid password reset') # make this take 2s, to prevent brute forcing. time.sleep(2) h.flash(msg, category='success') return HTTPFound(self.request.route_path('reset_password')) if self.request.POST: if h.HasPermissionAny('hg.password_reset.disabled')(): _email = self.request.POST.get('email', '') log.error('Failed attempt to reset password for `%s`.', _email) h.flash(_('Password reset has been disabled.'), category='error') return HTTPFound(self.request.route_path('reset_password')) password_reset_form = PasswordResetForm(self.request.translate)() description = 'Generated token for password reset from {}'.format( datetime.datetime.now().isoformat()) try: form_result = password_reset_form.to_python( self.request.POST) user_email = form_result['email'] if captcha.active: captcha_status, captcha_message = self.validate_captcha( captcha.private_key) if not captcha_status: _value = form_result _msg = _('Bad captcha') error_dict = {'recaptcha_field': captcha_message} raise formencode.Invalid( _msg, _value, None, error_dict=error_dict) # Generate reset URL and send mail. user = User.get_by_email(user_email) # only allow rhodecode based users to reset their password # external auth shouldn't allow password reset if user and user.extern_type != auth_rhodecode.RhodeCodeAuthPlugin.uid: log.warning('User %s with external type `%s` tried a password reset. ' 'This try was rejected', user, user.extern_type) return default_response() # generate password reset token that expires in 10 minutes reset_token = UserModel().add_auth_token( user=user, lifetime_minutes=10, role=UserModel.auth_token_role.ROLE_PASSWORD_RESET, description=description) Session().commit() log.debug('Successfully created password recovery token') password_reset_url = self.request.route_url( 'reset_password_confirmation', _query={'key': reset_token.api_key}) UserModel().reset_password_link( form_result, password_reset_url) action_data = {'email': user_email, 'user_agent': self.request.user_agent} audit_logger.store_web( 'user.password.reset_request', action_data=action_data, user=self._rhodecode_user, commit=True) return default_response() except formencode.Invalid as errors: template_context.update({ 'defaults': errors.value, 'errors': errors.error_dict, }) if not self.request.POST.get('email'): # case of empty email, we want to report that return self._get_template_context(c, **template_context) if 'recaptcha_field' in errors.error_dict: # case of failed captcha return self._get_template_context(c, **template_context) return default_response() return self._get_template_context(c, **template_context) def password_reset_confirmation(self): self.load_default_context() if key := self.request.GET.get('key'): # make this take 2s, to prevent brute forcing. time.sleep(2) token = AuthTokenModel().get_auth_token(key) # verify token is the correct role if token is None or token.role != UserApiKeys.ROLE_PASSWORD_RESET: log.debug('Got token with role:%s expected is %s', getattr(token, 'role', 'EMPTY_TOKEN'), UserApiKeys.ROLE_PASSWORD_RESET) h.flash( _('Given reset token is invalid'), category='error') return HTTPFound(self.request.route_path('reset_password')) try: owner = token.user data = {'email': owner.email, 'token': token.api_key} UserModel().reset_password(data) h.flash( _('Your password reset was successful, ' 'a new password has been sent to your email'), category='success') except Exception as e: log.error(e) return HTTPFound(self.request.route_path('reset_password')) return HTTPFound(self.request.route_path('login')) @LoginRequired() @NotAnonymous() def setup_2fa(self): _ = self.request.translate c = self.load_default_context() user_instance = self._rhodecode_db_user form = TOTPForm(_, user_instance)() render_ctx = {} if self.request.method == 'POST': post_items = dict(self.request.POST) try: form_details = form.to_python(post_items) secret = form_details['secret_totp'] user_instance.init_2fa_recovery_codes(persist=True, force=True) user_instance.secret_2fa = secret Session().commit() raise HTTPFound(self.request.route_path('my_account_configure_2fa', _query={'show-recovery-codes': 1})) except formencode.Invalid as errors: defaults = errors.value render_ctx = { 'errors': errors.error_dict, 'defaults': defaults, } # NOTE: here we DO NOT persist the secret 2FA, since this is only for setup, once a setup is completed # only then we should persist it secret = user_instance.init_secret_2fa(persist=False) instance_name = rhodecode.ConfigGet().get_str('app.base_url', 'rhodecode') totp_name = f'{instance_name}:{self.request.user.username}' qr = qrcode.QRCode(version=1, box_size=5, border=4) qr.add_data(pyotp.totp.TOTP(secret).provisioning_uri(name=totp_name)) qr.make(fit=True) img = qr.make_image(fill_color='black', back_color='white') buffered = BytesIO() img.save(buffered) return self._get_template_context( c, qr=b64encode(buffered.getvalue()).decode("utf-8"), key=secret, totp_name=totp_name, ** render_ctx ) @LoginRequired() @NotAnonymous() def verify_2fa(self): _ = self.request.translate c = self.load_default_context() render_ctx = {} user_instance = self._rhodecode_db_user totp_form = TOTPForm(_, user_instance, allow_recovery_code_use=True)() if self.request.method == 'POST': post_items = dict(self.request.POST) # NOTE: inject secret, as it's a post configured saved item. post_items['secret_totp'] = user_instance.secret_2fa try: totp_form.to_python(post_items) user_instance.check_2fa_required = False Session().commit() raise HTTPFound(c.came_from) except formencode.Invalid as errors: defaults = errors.value render_ctx = { 'errors': errors.error_dict, 'defaults': defaults, } return self._get_template_context(c, **render_ctx)