Show More
auth_headers.py
231 lines
| 8.3 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2012-2023 RhodeCode GmbH | |||
r59 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import colander | ||||
import logging | ||||
r1454 | from rhodecode.translation import _ | |||
from rhodecode.authentication.base import ( | ||||
RhodeCodeExternalAuthPlugin, hybrid_property) | ||||
r59 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |||
from rhodecode.authentication.routes import AuthnPluginResourceBase | ||||
from rhodecode.lib.colander_utils import strip_whitespace | ||||
r5057 | from rhodecode.lib.str_utils import safe_str | |||
from rhodecode.lib.utils2 import str2bool | ||||
r59 | from rhodecode.model.db import User | |||
log = logging.getLogger(__name__) | ||||
r3253 | def plugin_factory(plugin_id, *args, **kwargs): | |||
r59 | """ | |||
Factory function that is called during plugin discovery. | ||||
It returns the plugin instance. | ||||
""" | ||||
plugin = RhodeCodeAuthPlugin(plugin_id) | ||||
return plugin | ||||
class HeadersAuthnResource(AuthnPluginResourceBase): | ||||
pass | ||||
class HeadersSettingsSchema(AuthnPluginSettingsSchemaBase): | ||||
header = colander.SchemaNode( | ||||
colander.String(), | ||||
default='REMOTE_USER', | ||||
description=_('Header to extract the user from'), | ||||
preparer=strip_whitespace, | ||||
title=_('Header'), | ||||
widget='string') | ||||
fallback_header = colander.SchemaNode( | ||||
colander.String(), | ||||
default='HTTP_X_FORWARDED_USER', | ||||
description=_('Header to extract the user from when main one fails'), | ||||
preparer=strip_whitespace, | ||||
title=_('Fallback header'), | ||||
widget='string') | ||||
clean_username = colander.SchemaNode( | ||||
colander.Boolean(), | ||||
default=True, | ||||
description=_('Perform cleaning of user, if passed user has @ in ' | ||||
'username then first part before @ is taken. ' | ||||
'If there\'s \\ in the username only the part after ' | ||||
' \\ is taken'), | ||||
missing=False, | ||||
title=_('Clean username'), | ||||
widget='bool') | ||||
class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin): | ||||
r3246 | uid = 'headers' | |||
r3988 | ||||
r59 | def includeme(self, config): | |||
config.add_authn_plugin(self) | ||||
config.add_authn_resource(self.get_id(), HeadersAuthnResource(self)) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_get', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r59 | request_method='GET', | |||
route_name='auth_home', | ||||
context=HeadersAuthnResource) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_post', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r59 | request_method='POST', | |||
route_name='auth_home', | ||||
context=HeadersAuthnResource) | ||||
r4545 | def get_display_name(self, load_from_settings=False): | |||
r59 | return _('Headers') | |||
def get_settings_schema(self): | ||||
return HeadersSettingsSchema() | ||||
@hybrid_property | ||||
def name(self): | ||||
r5094 | return "headers" | |||
r59 | ||||
r108 | @property | |||
def is_headers_auth(self): | ||||
r59 | return True | |||
def use_fake_password(self): | ||||
return True | ||||
def user_activation_state(self): | ||||
r1997 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] | |||
r59 | return 'hg.extern_activate.auto' in def_user_perms | |||
def _clean_username(self, username): | ||||
# Removing realm and domain from username | ||||
username = username.split('@')[0] | ||||
username = username.rsplit('\\')[-1] | ||||
return username | ||||
def _get_username(self, environ, settings): | ||||
username = None | ||||
environ = environ or {} | ||||
if not environ: | ||||
r3061 | log.debug('got empty environ: %s', environ) | |||
r59 | ||||
settings = settings or {} | ||||
if settings.get('header'): | ||||
header = settings.get('header') | ||||
username = environ.get(header) | ||||
r3061 | log.debug('extracted %s:%s', header, username) | |||
r59 | ||||
# fallback mode | ||||
if not username and settings.get('fallback_header'): | ||||
header = settings.get('fallback_header') | ||||
username = environ.get(header) | ||||
r3061 | log.debug('extracted %s:%s', header, username) | |||
r59 | ||||
if username and str2bool(settings.get('clean_username')): | ||||
r3061 | log.debug('Received username `%s` from headers', username) | |||
r59 | username = self._clean_username(username) | |||
r3061 | log.debug('New cleanup user is:%s', username) | |||
r59 | return username | |||
def get_user(self, username=None, **kwargs): | ||||
""" | ||||
Helper method for user fetching in plugins, by default it's using | ||||
r5057 | simple fetch by username, but this method can be customized in plugins | |||
r59 | eg. headers auth plugin to fetch user by environ params | |||
:param username: username if given to fetch | ||||
:param kwargs: extra arguments needed for user fetching. | ||||
""" | ||||
environ = kwargs.get('environ') or {} | ||||
settings = kwargs.get('settings') or {} | ||||
username = self._get_username(environ, settings) | ||||
# we got the username, so use default method now | ||||
r5094 | return super().get_user(username) | |||
r59 | ||||
def auth(self, userobj, username, password, settings, **kwargs): | ||||
""" | ||||
r5057 | Gets the headers_auth username (or email). It tries to get username | |||
r59 | from REMOTE_USER if this plugin is enabled, if that fails | |||
it tries to get username from HTTP_X_FORWARDED_USER if fallback header | ||||
is set. clean_username extracts the username from this data if it's | ||||
having @ in it. | ||||
Return None on failure. On success, return a dictionary of the form: | ||||
see: RhodeCodeAuthPluginBase.auth_func_attrs | ||||
:param userobj: | ||||
:param username: | ||||
:param password: | ||||
:param settings: | ||||
:param kwargs: | ||||
""" | ||||
environ = kwargs.get('environ') | ||||
if not environ: | ||||
log.debug('Empty environ data skipping...') | ||||
return None | ||||
if not userobj: | ||||
userobj = self.get_user('', environ=environ, settings=settings) | ||||
# we don't care passed username/password for headers auth plugins. | ||||
# only way to log in is using environ | ||||
username = None | ||||
if userobj: | ||||
username = getattr(userobj, 'username') | ||||
if not username: | ||||
r79 | # we don't have any objects in DB user doesn't exist extract | |||
# username from environ based on the settings | ||||
r59 | username = self._get_username(environ, settings) | |||
# if cannot fetch username, it's a no-go for this plugin to proceed | ||||
if not username: | ||||
return None | ||||
# old attrs fetched from RhodeCode database | ||||
admin = getattr(userobj, 'admin', False) | ||||
active = getattr(userobj, 'active', True) | ||||
email = getattr(userobj, 'email', '') | ||||
firstname = getattr(userobj, 'firstname', '') | ||||
lastname = getattr(userobj, 'lastname', '') | ||||
extern_type = getattr(userobj, 'extern_type', '') | ||||
user_attrs = { | ||||
'username': username, | ||||
r5057 | 'firstname': safe_str(firstname or username), | |||
'lastname': safe_str(lastname or ''), | ||||
r59 | 'groups': [], | |||
r2495 | 'user_group_sync': False, | |||
r59 | 'email': email or '', | |||
'admin': admin or False, | ||||
'active': active, | ||||
'active_from_extern': True, | ||||
'extern_name': username, | ||||
'extern_type': extern_type, | ||||
} | ||||
r4816 | log.info('user `%s` authenticated correctly', user_attrs['username'], | |||
r4818 | extra={"action": "user_auth_ok", "auth_module": "auth_headers", "username": user_attrs["username"]}) | |||
r59 | return user_attrs | |||
r3240 | ||||
def includeme(config): | ||||
r5094 | plugin_id = f'egg:rhodecode-enterprise-ce#{RhodeCodeAuthPlugin.uid}' | |||
r3240 | plugin_factory(plugin_id).includeme(config) | |||