auth_rhodecode.py
228 lines
| 8.9 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2012-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
RhodeCode authentication plugin for built in internal auth | ||||
""" | ||||
import logging | ||||
r3386 | import colander | |||
r2098 | from rhodecode.translation import _ | |||
r5057 | from rhodecode.lib.utils2 import safe_bytes | |||
r1 | from rhodecode.model.db import User | |||
r3392 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |||
from rhodecode.authentication.base import ( | ||||
RhodeCodeAuthPluginBase, hybrid_property, HTTP_TYPE, VCS_TYPE) | ||||
from rhodecode.authentication.routes import AuthnPluginResourceBase | ||||
r1 | ||||
log = logging.getLogger(__name__) | ||||
r3253 | def plugin_factory(plugin_id, *args, **kwargs): | |||
r1 | plugin = RhodeCodeAuthPlugin(plugin_id) | |||
return plugin | ||||
class RhodecodeAuthnResource(AuthnPluginResourceBase): | ||||
pass | ||||
class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase): | ||||
r3246 | uid = 'rhodecode' | |||
r3392 | AUTH_RESTRICTION_NONE = 'user_all' | |||
AUTH_RESTRICTION_SUPER_ADMIN = 'user_super_admin' | ||||
AUTH_RESTRICTION_SCOPE_ALL = 'scope_all' | ||||
AUTH_RESTRICTION_SCOPE_HTTP = 'scope_http' | ||||
AUTH_RESTRICTION_SCOPE_VCS = 'scope_vcs' | ||||
r1 | ||||
def includeme(self, config): | ||||
config.add_authn_plugin(self) | ||||
config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self)) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_get', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r1 | request_method='GET', | |||
route_name='auth_home', | ||||
context=RhodecodeAuthnResource) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_post', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r1 | request_method='POST', | |||
route_name='auth_home', | ||||
context=RhodecodeAuthnResource) | ||||
r3386 | def get_settings_schema(self): | |||
return RhodeCodeSettingsSchema() | ||||
r4545 | def get_display_name(self, load_from_settings=False): | |||
r3234 | return _('RhodeCode Internal') | |||
r1 | ||||
r3253 | @classmethod | |||
def docs(cls): | ||||
return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth.html" | ||||
r1 | @hybrid_property | |||
def name(self): | ||||
r5094 | return "rhodecode" | |||
r1 | ||||
def user_activation_state(self): | ||||
r1997 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] | |||
r1 | return 'hg.register.auto_activate' in def_user_perms | |||
def allows_authentication_from( | ||||
self, user, allows_non_existing_user=True, | ||||
allowed_auth_plugins=None, allowed_auth_sources=None): | ||||
""" | ||||
Custom method for this auth that doesn't accept non existing users. | ||||
We know that user exists in our database. | ||||
""" | ||||
allows_non_existing_user = False | ||||
r5094 | return super().allows_authentication_from( | |||
r1 | user, allows_non_existing_user=allows_non_existing_user) | |||
def auth(self, userobj, username, password, settings, **kwargs): | ||||
if not userobj: | ||||
r3061 | log.debug('userobj was:%s skipping', userobj) | |||
r1 | return None | |||
r3387 | ||||
r1 | if userobj.extern_type != self.name: | |||
r3392 | log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`", | |||
userobj, userobj.extern_type, self.name) | ||||
return None | ||||
# check scope of auth | ||||
scope_restriction = settings.get('scope_restriction', '') | ||||
if scope_restriction == self.AUTH_RESTRICTION_SCOPE_HTTP \ | ||||
and self.auth_type != HTTP_TYPE: | ||||
log.warning("userobj:%s tried scope type %s and scope restriction is set to %s", | ||||
userobj, self.auth_type, scope_restriction) | ||||
r1 | return None | |||
r3392 | if scope_restriction == self.AUTH_RESTRICTION_SCOPE_VCS \ | |||
and self.auth_type != VCS_TYPE: | ||||
log.warning("userobj:%s tried scope type %s and scope restriction is set to %s", | ||||
userobj, self.auth_type, scope_restriction) | ||||
return None | ||||
# check super-admin restriction | ||||
auth_restriction = settings.get('auth_restriction', '') | ||||
if auth_restriction == self.AUTH_RESTRICTION_SUPER_ADMIN \ | ||||
and userobj.admin is False: | ||||
log.warning("userobj:%s is not super-admin and auth restriction is set to %s", | ||||
userobj, auth_restriction) | ||||
r3387 | return None | |||
r1 | user_attrs = { | |||
"username": userobj.username, | ||||
"firstname": userobj.firstname, | ||||
"lastname": userobj.lastname, | ||||
"groups": [], | ||||
r2495 | 'user_group_sync': False, | |||
r1 | "email": userobj.email, | |||
"admin": userobj.admin, | ||||
"active": userobj.active, | ||||
"active_from_extern": userobj.active, | ||||
"extern_name": userobj.user_id, | ||||
"extern_type": userobj.extern_type, | ||||
} | ||||
r3061 | log.debug("User attributes:%s", user_attrs) | |||
r1 | if userobj.active: | |||
from rhodecode.lib import auth | ||||
crypto_backend = auth.crypto_backend() | ||||
r5057 | password_encoded = safe_bytes(password) | |||
r1 | password_match, new_hash = crypto_backend.hash_check_with_upgrade( | |||
r2153 | password_encoded, userobj.password or '') | |||
r1 | ||||
if password_match and new_hash: | ||||
log.debug('user %s properly authenticated, but ' | ||||
'requires hash change to bcrypt', userobj) | ||||
# if password match, and we use OLD deprecated hash, | ||||
# we should migrate this user hash password to the new hash | ||||
# we store the new returned by hash_check_with_upgrade function | ||||
user_attrs['_hash_migrate'] = new_hash | ||||
if userobj.username == User.DEFAULT_USER and userobj.active: | ||||
r3387 | log.info('user `%s` authenticated correctly as anonymous user', | |||
r4816 | userobj.username, | |||
r4818 | extra={"action": "user_auth_ok", "auth_module": "auth_rhodecode_anon", "username": userobj.username}) | |||
r1 | return user_attrs | |||
r5358 | elif (userobj.username == username or userobj.email == username) and password_match: | |||
r4816 | log.info('user `%s` authenticated correctly', userobj.username, | |||
r4818 | extra={"action": "user_auth_ok", "auth_module": "auth_rhodecode", "username": userobj.username}) | |||
r1 | return user_attrs | |||
r3392 | log.warning("user `%s` used a wrong password when " | |||
"authenticating on this plugin", userobj.username) | ||||
r1 | return None | |||
else: | ||||
r3392 | log.warning('user `%s` failed to authenticate via %s, reason: account not ' | |||
'active.', username, self.name) | ||||
r1 | return None | |||
r3240 | ||||
r3387 | class RhodeCodeSettingsSchema(AuthnPluginSettingsSchemaBase): | |||
r5360 | global_2fa = colander.SchemaNode( | |||
colander.Bool(), | ||||
default=False, | ||||
description=_('Force all users to use two factor authentication by enabling this.'), | ||||
missing=False, | ||||
title=_('Global 2FA'), | ||||
widget='bool', | ||||
) | ||||
r3392 | ||||
auth_restriction_choices = [ | ||||
(RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE, 'All users'), | ||||
(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN, 'Super admins only'), | ||||
] | ||||
auth_scope_choices = [ | ||||
(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL, 'HTTP and VCS'), | ||||
(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_HTTP, 'HTTP only'), | ||||
r3387 | ] | |||
r3392 | auth_restriction = colander.SchemaNode( | |||
r3387 | colander.String(), | |||
r3392 | default=auth_restriction_choices[0], | |||
description=_('Allowed user types for authentication using this plugin.'), | ||||
title=_('User restriction'), | ||||
validator=colander.OneOf([x[0] for x in auth_restriction_choices]), | ||||
r3387 | widget='select_with_labels', | |||
r3392 | choices=auth_restriction_choices | |||
) | ||||
scope_restriction = colander.SchemaNode( | ||||
colander.String(), | ||||
default=auth_scope_choices[0], | ||||
description=_('Allowed protocols for authentication using this plugin. ' | ||||
'VCS means GIT/HG/SVN. HTTP is web based login.'), | ||||
title=_('Scope restriction'), | ||||
validator=colander.OneOf([x[0] for x in auth_scope_choices]), | ||||
widget='select_with_labels', | ||||
choices=auth_scope_choices | ||||
r3387 | ) | |||
r3240 | def includeme(config): | |||
r5094 | plugin_id = f'egg:rhodecode-enterprise-ce#{RhodeCodeAuthPlugin.uid}' | |||
r3240 | plugin_factory(plugin_id).includeme(config) | |||