views.py
553 lines
| 21.6 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
r1501 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import time | ||||
r5360 | import json | |||
import pyotp | ||||
import qrcode | ||||
r1501 | import collections | |||
import datetime | ||||
import formencode | ||||
r2080 | import formencode.htmlfill | |||
r1501 | import logging | |||
r4919 | import urllib.parse | |||
r2731 | import requests | |||
r5360 | from io import BytesIO | |||
from base64 import b64encode | ||||
r1501 | ||||
r5360 | from pyramid.renderers import render | |||
from pyramid.response import Response | ||||
r1501 | from pyramid.httpexceptions import HTTPFound | |||
r4610 | ||||
r5373 | import rhodecode | |||
r1536 | from rhodecode.apps._base import BaseAppView | |||
r1501 | from rhodecode.authentication.base import authenticate, HTTP_TYPE | |||
r3258 | from rhodecode.authentication.plugins import auth_rhodecode | |||
r2358 | from rhodecode.events import UserRegistered, trigger | |||
r1501 | from rhodecode.lib import helpers as h | |||
r1697 | from rhodecode.lib import audit_logger | |||
r1501 | from rhodecode.lib.auth import ( | |||
r5360 | AuthUser, HasPermissionAnyDecorator, CSRFRequired, LoginRequired, NotAnonymous) | |||
r1501 | from rhodecode.lib.base import get_ip_addr | |||
from rhodecode.lib.exceptions import UserCreationError | ||||
from rhodecode.lib.utils2 import safe_str | ||||
from rhodecode.model.db import User, UserApiKeys | ||||
r5360 | from rhodecode.model.forms import LoginForm, RegisterForm, PasswordResetForm, TOTPForm | |||
r1501 | from rhodecode.model.meta import Session | |||
from rhodecode.model.auth_token import AuthTokenModel | ||||
from rhodecode.model.settings import SettingsModel | ||||
from rhodecode.model.user import UserModel | ||||
from rhodecode.translation import _ | ||||
log = logging.getLogger(__name__) | ||||
CaptchaData = collections.namedtuple( | ||||
'CaptchaData', 'active, private_key, public_key') | ||||
r5358 | def store_user_in_session(session, user_identifier, remember=False): | |||
user = User.get_by_username_or_primary_email(user_identifier) | ||||
r1501 | auth_user = AuthUser(user.user_id) | |||
auth_user.set_authenticated() | ||||
cs = auth_user.get_cookie_store() | ||||
session['rhodecode_user'] = cs | ||||
user.update_lastlogin() | ||||
Session().commit() | ||||
# If they want to be remembered, update the cookie | ||||
if remember: | ||||
_year = (datetime.datetime.now() + | ||||
datetime.timedelta(seconds=60 * 60 * 24 * 365)) | ||||
session._set_cookie_expires(_year) | ||||
session.save() | ||||
safe_cs = cs.copy() | ||||
safe_cs['password'] = '****' | ||||
log.info('user %s is now authenticated and stored in ' | ||||
r5358 | 'session, session attrs %s', user_identifier, safe_cs) | |||
r1501 | ||||
# dumps session attrs back to cookie | ||||
session._update_cookie_out() | ||||
# we set new cookie | ||||
headers = None | ||||
if session.request['set_cookie']: | ||||
# send set-cookie headers back to response to update cookie | ||||
headers = [('Set-Cookie', session.request['cookie_out'])] | ||||
return headers | ||||
def get_came_from(request): | ||||
came_from = safe_str(request.GET.get('came_from', '')) | ||||
r4950 | parsed = urllib.parse.urlparse(came_from) | |||
r5053 | ||||
r1501 | allowed_schemes = ['http', 'https'] | |||
r1774 | default_came_from = h.route_path('home') | |||
r1501 | if parsed.scheme and parsed.scheme not in allowed_schemes: | |||
r3061 | log.error('Suspicious URL scheme detected %s for url %s', | |||
parsed.scheme, parsed) | ||||
r1774 | came_from = default_came_from | |||
r1501 | elif parsed.netloc and request.host != parsed.netloc: | |||
log.error('Suspicious NETLOC detected %s for url %s server url ' | ||||
r3061 | 'is: %s', parsed.netloc, parsed, request.host) | |||
r1774 | came_from = default_came_from | |||
r5053 | elif any(bad_char in came_from for bad_char in ('\r', '\n')): | |||
r3061 | log.error('Header injection detected `%s` for url %s server url ', | |||
parsed.path, parsed) | ||||
r1774 | came_from = default_came_from | |||
r1501 | ||||
r1774 | return came_from or default_came_from | |||
r1501 | ||||
r1536 | class LoginView(BaseAppView): | |||
r1501 | ||||
r1536 | def load_default_context(self): | |||
c = self._get_local_tmpl_context() | ||||
c.came_from = get_came_from(self.request) | ||||
return c | ||||
r1501 | ||||
def _get_captcha_data(self): | ||||
settings = SettingsModel().get_all_settings() | ||||
private_key = settings.get('rhodecode_captcha_private_key') | ||||
public_key = settings.get('rhodecode_captcha_public_key') | ||||
active = bool(private_key) | ||||
return CaptchaData( | ||||
active=active, private_key=private_key, public_key=public_key) | ||||
r2731 | def validate_captcha(self, private_key): | |||
captcha_rs = self.request.POST.get('g-recaptcha-response') | ||||
url = "https://www.google.com/recaptcha/api/siteverify" | ||||
params = { | ||||
'secret': private_key, | ||||
'response': captcha_rs, | ||||
'remoteip': get_ip_addr(self.request.environ) | ||||
} | ||||
r2950 | verify_rs = requests.get(url, params=params, verify=True, timeout=60) | |||
r2731 | verify_rs = verify_rs.json() | |||
captcha_status = verify_rs.get('success', False) | ||||
captcha_errors = verify_rs.get('error-codes', []) | ||||
if not isinstance(captcha_errors, list): | ||||
captcha_errors = [captcha_errors] | ||||
captcha_errors = ', '.join(captcha_errors) | ||||
captcha_message = '' | ||||
if captcha_status is False: | ||||
captcha_message = "Bad captcha. Errors: {}".format( | ||||
captcha_errors) | ||||
return captcha_status, captcha_message | ||||
r1501 | def login(self): | |||
r1536 | c = self.load_default_context() | |||
auth_user = self._rhodecode_user | ||||
r1501 | ||||
# redirect if already logged in | ||||
r1536 | if (auth_user.is_authenticated and | |||
not auth_user.is_default and auth_user.ip_allowed): | ||||
raise HTTPFound(c.came_from) | ||||
r1501 | ||||
# check if we use headers plugin, and try to login using it. | ||||
try: | ||||
log.debug('Running PRE-AUTH for headers based authentication') | ||||
auth_info = authenticate( | ||||
'', '', self.request.environ, HTTP_TYPE, skip_missing=True) | ||||
if auth_info: | ||||
r3250 | headers = store_user_in_session( | |||
r1501 | self.session, auth_info.get('username')) | |||
r1536 | raise HTTPFound(c.came_from, headers=headers) | |||
r1501 | except UserCreationError as e: | |||
log.error(e) | ||||
r2358 | h.flash(e, category='error') | |||
r1501 | ||||
r1536 | return self._get_template_context(c) | |||
r1501 | ||||
def login_post(self): | ||||
r1536 | c = self.load_default_context() | |||
r1501 | ||||
r2351 | login_form = LoginForm(self.request.translate)() | |||
r1501 | ||||
try: | ||||
self.session.invalidate() | ||||
r2149 | form_result = login_form.to_python(self.request.POST) | |||
r1501 | # form checks for username/password, now we're authenticated | |||
r5360 | username = form_result['username'] | |||
if (user := User.get_by_username_or_primary_email(username)).has_enabled_2fa: | ||||
r5374 | user.check_2fa_required = True | |||
r5367 | ||||
r3250 | headers = store_user_in_session( | |||
r1501 | self.session, | |||
r5360 | user_identifier=username, | |||
r1501 | remember=form_result['remember']) | |||
r1536 | log.debug('Redirecting to "%s" after login.', c.came_from) | |||
r1697 | ||||
audit_user = audit_logger.UserWrap( | ||||
r2149 | username=self.request.POST.get('username'), | |||
r1697 | ip_addr=self.request.remote_addr) | |||
r1702 | action_data = {'user_agent': self.request.user_agent} | |||
r1806 | audit_logger.store_web( | |||
r1829 | 'user.login.success', action_data=action_data, | |||
r1702 | user=audit_user, commit=True) | |||
r1697 | ||||
r1536 | raise HTTPFound(c.came_from, headers=headers) | |||
r1501 | except formencode.Invalid as errors: | |||
defaults = errors.value | ||||
# remove password from filling in form again | ||||
defaults.pop('password', None) | ||||
r2351 | render_ctx = { | |||
r1501 | 'errors': errors.error_dict, | |||
'defaults': defaults, | ||||
r2351 | } | |||
r1697 | ||||
audit_user = audit_logger.UserWrap( | ||||
r2149 | username=self.request.POST.get('username'), | |||
r1697 | ip_addr=self.request.remote_addr) | |||
r1702 | action_data = {'user_agent': self.request.user_agent} | |||
r1806 | audit_logger.store_web( | |||
r1829 | 'user.login.failure', action_data=action_data, | |||
r1702 | user=audit_user, commit=True) | |||
r2351 | return self._get_template_context(c, **render_ctx) | |||
r1501 | ||||
except UserCreationError as e: | ||||
# headers auth or other auth functions that create users on | ||||
# the fly can throw this exception signaling that there's issue | ||||
# with user creation, explanation should be provided in | ||||
# Exception itself | ||||
r2358 | h.flash(e, category='error') | |||
r1536 | return self._get_template_context(c) | |||
r1501 | ||||
@CSRFRequired() | ||||
def logout(self): | ||||
r1536 | auth_user = self._rhodecode_user | |||
log.info('Deleting session for user: `%s`', auth_user) | ||||
r1702 | ||||
action_data = {'user_agent': self.request.user_agent} | ||||
r1806 | audit_logger.store_web( | |||
r1829 | 'user.logout', action_data=action_data, | |||
r1702 | user=auth_user, commit=True) | |||
r1501 | self.session.delete() | |||
r1774 | return HTTPFound(h.route_path('home')) | |||
r1501 | ||||
@HasPermissionAnyDecorator( | ||||
'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate') | ||||
def register(self, defaults=None, errors=None): | ||||
r1536 | c = self.load_default_context() | |||
r1501 | defaults = defaults or {} | |||
errors = errors or {} | ||||
settings = SettingsModel().get_all_settings() | ||||
register_message = settings.get('rhodecode_register_message') or '' | ||||
captcha = self._get_captcha_data() | ||||
auto_active = 'hg.register.auto_activate' in User.get_default_user()\ | ||||
r1997 | .AuthUser().permissions['global'] | |||
r1501 | ||||
r1536 | render_ctx = self._get_template_context(c) | |||
r1501 | render_ctx.update({ | |||
'defaults': defaults, | ||||
'errors': errors, | ||||
'auto_active': auto_active, | ||||
'captcha_active': captcha.active, | ||||
'captcha_public_key': captcha.public_key, | ||||
'register_message': register_message, | ||||
}) | ||||
return render_ctx | ||||
@HasPermissionAnyDecorator( | ||||
'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate') | ||||
def register_post(self): | ||||
r3255 | from rhodecode.authentication.plugins import auth_rhodecode | |||
r2358 | self.load_default_context() | |||
r1501 | captcha = self._get_captcha_data() | |||
auto_active = 'hg.register.auto_activate' in User.get_default_user()\ | ||||
r1997 | .AuthUser().permissions['global'] | |||
r1501 | ||||
r3255 | extern_name = auth_rhodecode.RhodeCodeAuthPlugin.uid | |||
extern_type = auth_rhodecode.RhodeCodeAuthPlugin.uid | ||||
r2351 | register_form = RegisterForm(self.request.translate)() | |||
r1501 | try: | |||
r2149 | ||||
form_result = register_form.to_python(self.request.POST) | ||||
r1501 | form_result['active'] = auto_active | |||
r3255 | external_identity = self.request.POST.get('external_identity') | |||
if external_identity: | ||||
extern_name = external_identity | ||||
extern_type = external_identity | ||||
r1501 | ||||
if captcha.active: | ||||
r2731 | captcha_status, captcha_message = self.validate_captcha( | |||
captcha.private_key) | ||||
if not captcha_status: | ||||
r1501 | _value = form_result | |||
_msg = _('Bad captcha') | ||||
r2731 | error_dict = {'recaptcha_field': captcha_message} | |||
raise formencode.Invalid( | ||||
_msg, _value, None, error_dict=error_dict) | ||||
r1501 | ||||
r3255 | new_user = UserModel().create_registration( | |||
form_result, extern_name=extern_name, extern_type=extern_type) | ||||
r2384 | ||||
action_data = {'data': new_user.get_api_data(), | ||||
'user_agent': self.request.user_agent} | ||||
r2732 | ||||
r3255 | if external_identity: | |||
action_data['external_identity'] = external_identity | ||||
r2732 | audit_user = audit_logger.UserWrap( | |||
username=new_user.username, | ||||
user_id=new_user.user_id, | ||||
ip_addr=self.request.remote_addr) | ||||
r2384 | audit_logger.store_web( | |||
'user.register', action_data=action_data, | ||||
r2732 | user=audit_user) | |||
r2384 | ||||
r1501 | event = UserRegistered(user=new_user, session=self.session) | |||
r2358 | trigger(event) | |||
h.flash( | ||||
r4058 | _('You have successfully registered with RhodeCode. You can log-in now.'), | |||
r2358 | category='success') | |||
r4058 | if external_identity: | |||
h.flash( | ||||
_('Please use the {identity} button to log-in').format( | ||||
identity=external_identity), | ||||
category='success') | ||||
r1501 | Session().commit() | |||
redirect_ro = self.request.route_path('login') | ||||
raise HTTPFound(redirect_ro) | ||||
except formencode.Invalid as errors: | ||||
errors.value.pop('password', None) | ||||
errors.value.pop('password_confirmation', None) | ||||
return self.register( | ||||
defaults=errors.value, errors=errors.error_dict) | ||||
except UserCreationError as e: | ||||
# container auth or other auth functions that create users on | ||||
# the fly can throw this exception signaling that there's issue | ||||
# with user creation, explanation should be provided in | ||||
# Exception itself | ||||
r2358 | h.flash(e, category='error') | |||
r1501 | return self.register() | |||
def password_reset(self): | ||||
r2358 | c = self.load_default_context() | |||
r1501 | captcha = self._get_captcha_data() | |||
r2358 | template_context = { | |||
r1501 | 'captcha_active': captcha.active, | |||
'captcha_public_key': captcha.public_key, | ||||
'defaults': {}, | ||||
'errors': {}, | ||||
} | ||||
# always send implicit message to prevent from discovery of | ||||
# matching emails | ||||
msg = _('If such email exists, a password reset link was sent to it.') | ||||
r3258 | def default_response(): | |||
log.debug('faking response on invalid password reset') | ||||
# make this take 2s, to prevent brute forcing. | ||||
time.sleep(2) | ||||
h.flash(msg, category='success') | ||||
return HTTPFound(self.request.route_path('reset_password')) | ||||
r1501 | if self.request.POST: | |||
if h.HasPermissionAny('hg.password_reset.disabled')(): | ||||
_email = self.request.POST.get('email', '') | ||||
log.error('Failed attempt to reset password for `%s`.', _email) | ||||
r3258 | h.flash(_('Password reset has been disabled.'), category='error') | |||
r1501 | return HTTPFound(self.request.route_path('reset_password')) | |||
r2351 | password_reset_form = PasswordResetForm(self.request.translate)() | |||
r5095 | description = 'Generated token for password reset from {}'.format( | |||
r3258 | datetime.datetime.now().isoformat()) | |||
r1501 | try: | |||
form_result = password_reset_form.to_python( | ||||
r2149 | self.request.POST) | |||
r1501 | user_email = form_result['email'] | |||
if captcha.active: | ||||
r2731 | captcha_status, captcha_message = self.validate_captcha( | |||
captcha.private_key) | ||||
if not captcha_status: | ||||
r1501 | _value = form_result | |||
_msg = _('Bad captcha') | ||||
r2731 | error_dict = {'recaptcha_field': captcha_message} | |||
r1501 | raise formencode.Invalid( | |||
_msg, _value, None, error_dict=error_dict) | ||||
# Generate reset URL and send mail. | ||||
user = User.get_by_email(user_email) | ||||
r3258 | # only allow rhodecode based users to reset their password | |||
# external auth shouldn't allow password reset | ||||
if user and user.extern_type != auth_rhodecode.RhodeCodeAuthPlugin.uid: | ||||
log.warning('User %s with external type `%s` tried a password reset. ' | ||||
'This try was rejected', user, user.extern_type) | ||||
return default_response() | ||||
r2951 | # generate password reset token that expires in 10 minutes | |||
reset_token = UserModel().add_auth_token( | ||||
user=user, lifetime_minutes=10, | ||||
role=UserModel.auth_token_role.ROLE_PASSWORD_RESET, | ||||
description=description) | ||||
r1501 | Session().commit() | |||
log.debug('Successfully created password recovery token') | ||||
password_reset_url = self.request.route_url( | ||||
'reset_password_confirmation', | ||||
_query={'key': reset_token.api_key}) | ||||
UserModel().reset_password_link( | ||||
form_result, password_reset_url) | ||||
r1697 | ||||
r1702 | action_data = {'email': user_email, | |||
'user_agent': self.request.user_agent} | ||||
r1806 | audit_logger.store_web( | |||
r1829 | 'user.password.reset_request', action_data=action_data, | |||
r1806 | user=self._rhodecode_user, commit=True) | |||
r3258 | ||||
return default_response() | ||||
r1501 | ||||
except formencode.Invalid as errors: | ||||
r2358 | template_context.update({ | |||
r1501 | 'defaults': errors.value, | |||
'errors': errors.error_dict, | ||||
}) | ||||
r2149 | if not self.request.POST.get('email'): | |||
r1501 | # case of empty email, we want to report that | |||
r2358 | return self._get_template_context(c, **template_context) | |||
r1501 | ||||
if 'recaptcha_field' in errors.error_dict: | ||||
# case of failed captcha | ||||
r2358 | return self._get_template_context(c, **template_context) | |||
r1501 | ||||
r3258 | return default_response() | |||
r1501 | ||||
r2358 | return self._get_template_context(c, **template_context) | |||
r1501 | ||||
def password_reset_confirmation(self): | ||||
r2358 | self.load_default_context() | |||
r5369 | ||||
if key := self.request.GET.get('key'): | ||||
r1501 | # make this take 2s, to prevent brute forcing. | |||
time.sleep(2) | ||||
r5369 | token = AuthTokenModel().get_auth_token(key) | |||
r1501 | ||||
# verify token is the correct role | ||||
if token is None or token.role != UserApiKeys.ROLE_PASSWORD_RESET: | ||||
log.debug('Got token with role:%s expected is %s', | ||||
getattr(token, 'role', 'EMPTY_TOKEN'), | ||||
UserApiKeys.ROLE_PASSWORD_RESET) | ||||
r2358 | h.flash( | |||
_('Given reset token is invalid'), category='error') | ||||
r1501 | return HTTPFound(self.request.route_path('reset_password')) | |||
try: | ||||
owner = token.user | ||||
data = {'email': owner.email, 'token': token.api_key} | ||||
UserModel().reset_password(data) | ||||
r2358 | h.flash( | |||
r1501 | _('Your password reset was successful, ' | |||
'a new password has been sent to your email'), | ||||
r2358 | category='success') | |||
r1501 | except Exception as e: | |||
log.error(e) | ||||
return HTTPFound(self.request.route_path('reset_password')) | ||||
return HTTPFound(self.request.route_path('login')) | ||||
r5360 | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def setup_2fa(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_instance = self._rhodecode_db_user | ||||
form = TOTPForm(_, user_instance)() | ||||
render_ctx = {} | ||||
if self.request.method == 'POST': | ||||
r5367 | post_items = dict(self.request.POST) | |||
r5360 | try: | |||
r5367 | form_details = form.to_python(post_items) | |||
secret = form_details['secret_totp'] | ||||
user_instance.init_2fa_recovery_codes(persist=True, force=True) | ||||
r5375 | user_instance.secret_2fa = secret | |||
r5367 | ||||
r5360 | Session().commit() | |||
r5368 | raise HTTPFound(self.request.route_path('my_account_configure_2fa', _query={'show-recovery-codes': 1})) | |||
r5360 | except formencode.Invalid as errors: | |||
defaults = errors.value | ||||
render_ctx = { | ||||
'errors': errors.error_dict, | ||||
'defaults': defaults, | ||||
} | ||||
r5367 | ||||
# NOTE: here we DO NOT persist the secret 2FA, since this is only for setup, once a setup is completed | ||||
# only then we should persist it | ||||
secret = user_instance.init_secret_2fa(persist=False) | ||||
r5373 | instance_name = rhodecode.ConfigGet().get_str('app.base_url', 'rhodecode') | |||
totp_name = f'{instance_name}:{self.request.user.username}' | ||||
r5367 | ||||
r5373 | qr = qrcode.QRCode(version=1, box_size=5, border=4) | |||
r5367 | qr.add_data(pyotp.totp.TOTP(secret).provisioning_uri(name=totp_name)) | |||
r5360 | qr.make(fit=True) | |||
img = qr.make_image(fill_color='black', back_color='white') | ||||
buffered = BytesIO() | ||||
img.save(buffered) | ||||
return self._get_template_context( | ||||
c, | ||||
qr=b64encode(buffered.getvalue()).decode("utf-8"), | ||||
r5367 | key=secret, | |||
totp_name=totp_name, | ||||
r5360 | ** render_ctx | |||
) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def verify_2fa(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
render_ctx = {} | ||||
user_instance = self._rhodecode_db_user | ||||
totp_form = TOTPForm(_, user_instance, allow_recovery_code_use=True)() | ||||
if self.request.method == 'POST': | ||||
r5367 | post_items = dict(self.request.POST) | |||
# NOTE: inject secret, as it's a post configured saved item. | ||||
r5374 | post_items['secret_totp'] = user_instance.secret_2fa | |||
r5360 | try: | |||
r5367 | totp_form.to_python(post_items) | |||
r5374 | user_instance.check_2fa_required = False | |||
r5360 | Session().commit() | |||
raise HTTPFound(c.came_from) | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
render_ctx = { | ||||
'errors': errors.error_dict, | ||||
'defaults': defaults, | ||||
} | ||||
return self._get_template_context(c, **render_ctx) | ||||