##// END OF EJS Templates
chore(config): optimized celery configuration
chore(config): optimized celery configuration

File last commit:

r5094:71df309f default
r5299:2823fa9f default
Show More
auth_headers.py
231 lines | 8.3 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2012-2023 RhodeCode GmbH
authn: Renamed auth_container -> auth_headers
r59 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import colander
import logging
auth: refactor code and simplified instructions....
r1454 from rhodecode.translation import _
from rhodecode.authentication.base import (
RhodeCodeExternalAuthPlugin, hybrid_property)
authn: Renamed auth_container -> auth_headers
r59 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
from rhodecode.authentication.routes import AuthnPluginResourceBase
from rhodecode.lib.colander_utils import strip_whitespace
authentication: fixed for python3 migrations
r5057 from rhodecode.lib.str_utils import safe_str
from rhodecode.lib.utils2 import str2bool
authn: Renamed auth_container -> auth_headers
r59 from rhodecode.model.db import User
log = logging.getLogger(__name__)
auth-plugins: some code cleanup + added docs for main plugin.
r3253 def plugin_factory(plugin_id, *args, **kwargs):
authn: Renamed auth_container -> auth_headers
r59 """
Factory function that is called during plugin discovery.
It returns the plugin instance.
"""
plugin = RhodeCodeAuthPlugin(plugin_id)
return plugin
class HeadersAuthnResource(AuthnPluginResourceBase):
pass
class HeadersSettingsSchema(AuthnPluginSettingsSchemaBase):
header = colander.SchemaNode(
colander.String(),
default='REMOTE_USER',
description=_('Header to extract the user from'),
preparer=strip_whitespace,
title=_('Header'),
widget='string')
fallback_header = colander.SchemaNode(
colander.String(),
default='HTTP_X_FORWARDED_USER',
description=_('Header to extract the user from when main one fails'),
preparer=strip_whitespace,
title=_('Fallback header'),
widget='string')
clean_username = colander.SchemaNode(
colander.Boolean(),
default=True,
description=_('Perform cleaning of user, if passed user has @ in '
'username then first part before @ is taken. '
'If there\'s \\ in the username only the part after '
' \\ is taken'),
missing=False,
title=_('Clean username'),
widget='bool')
class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
authentication: use registerd UID for plugin definition for more consistent loading of auth plugins.
r3246 uid = 'headers'
dan
authentication: allow super-admins to change bound authentication for users....
r3988
authn: Renamed auth_container -> auth_headers
r59 def includeme(self, config):
config.add_authn_plugin(self)
config.add_authn_resource(self.get_id(), HeadersAuthnResource(self))
config.add_view(
'rhodecode.authentication.views.AuthnPluginViewBase',
attr='settings_get',
templating: use .mako as extensions for template files.
r1282 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
authn: Renamed auth_container -> auth_headers
r59 request_method='GET',
route_name='auth_home',
context=HeadersAuthnResource)
config.add_view(
'rhodecode.authentication.views.AuthnPluginViewBase',
attr='settings_post',
templating: use .mako as extensions for template files.
r1282 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
authn: Renamed auth_container -> auth_headers
r59 request_method='POST',
route_name='auth_home',
context=HeadersAuthnResource)
auth: allow custom name for plugins if defined in the settings.
r4545 def get_display_name(self, load_from_settings=False):
authn: Renamed auth_container -> auth_headers
r59 return _('Headers')
def get_settings_schema(self):
return HeadersSettingsSchema()
@hybrid_property
def name(self):
authentication: run modernize for python3
r5094 return "headers"
authn: Renamed auth_container -> auth_headers
r59
authn: Use new is_headers_auth function.
r108 @property
def is_headers_auth(self):
authn: Renamed auth_container -> auth_headers
r59 return True
def use_fake_password(self):
return True
def user_activation_state(self):
users: make AuthUser propert a method, and allow override of params.
r1997 def_user_perms = User.get_default_user().AuthUser().permissions['global']
authn: Renamed auth_container -> auth_headers
r59 return 'hg.extern_activate.auto' in def_user_perms
def _clean_username(self, username):
# Removing realm and domain from username
username = username.split('@')[0]
username = username.rsplit('\\')[-1]
return username
def _get_username(self, environ, settings):
username = None
environ = environ or {}
if not environ:
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('got empty environ: %s', environ)
authn: Renamed auth_container -> auth_headers
r59
settings = settings or {}
if settings.get('header'):
header = settings.get('header')
username = environ.get(header)
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('extracted %s:%s', header, username)
authn: Renamed auth_container -> auth_headers
r59
# fallback mode
if not username and settings.get('fallback_header'):
header = settings.get('fallback_header')
username = environ.get(header)
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('extracted %s:%s', header, username)
authn: Renamed auth_container -> auth_headers
r59
if username and str2bool(settings.get('clean_username')):
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('Received username `%s` from headers', username)
authn: Renamed auth_container -> auth_headers
r59 username = self._clean_username(username)
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('New cleanup user is:%s', username)
authn: Renamed auth_container -> auth_headers
r59 return username
def get_user(self, username=None, **kwargs):
"""
Helper method for user fetching in plugins, by default it's using
authentication: fixed for python3 migrations
r5057 simple fetch by username, but this method can be customized in plugins
authn: Renamed auth_container -> auth_headers
r59 eg. headers auth plugin to fetch user by environ params
:param username: username if given to fetch
:param kwargs: extra arguments needed for user fetching.
"""
environ = kwargs.get('environ') or {}
settings = kwargs.get('settings') or {}
username = self._get_username(environ, settings)
# we got the username, so use default method now
authentication: run modernize for python3
r5094 return super().get_user(username)
authn: Renamed auth_container -> auth_headers
r59
def auth(self, userobj, username, password, settings, **kwargs):
"""
authentication: fixed for python3 migrations
r5057 Gets the headers_auth username (or email). It tries to get username
authn: Renamed auth_container -> auth_headers
r59 from REMOTE_USER if this plugin is enabled, if that fails
it tries to get username from HTTP_X_FORWARDED_USER if fallback header
is set. clean_username extracts the username from this data if it's
having @ in it.
Return None on failure. On success, return a dictionary of the form:
see: RhodeCodeAuthPluginBase.auth_func_attrs
:param userobj:
:param username:
:param password:
:param settings:
:param kwargs:
"""
environ = kwargs.get('environ')
if not environ:
log.debug('Empty environ data skipping...')
return None
if not userobj:
userobj = self.get_user('', environ=environ, settings=settings)
# we don't care passed username/password for headers auth plugins.
# only way to log in is using environ
username = None
if userobj:
username = getattr(userobj, 'username')
if not username:
authn: Add rhodecode token auth plugin.
r79 # we don't have any objects in DB user doesn't exist extract
# username from environ based on the settings
authn: Renamed auth_container -> auth_headers
r59 username = self._get_username(environ, settings)
# if cannot fetch username, it's a no-go for this plugin to proceed
if not username:
return None
# old attrs fetched from RhodeCode database
admin = getattr(userobj, 'admin', False)
active = getattr(userobj, 'active', True)
email = getattr(userobj, 'email', '')
firstname = getattr(userobj, 'firstname', '')
lastname = getattr(userobj, 'lastname', '')
extern_type = getattr(userobj, 'extern_type', '')
user_attrs = {
'username': username,
authentication: fixed for python3 migrations
r5057 'firstname': safe_str(firstname or username),
'lastname': safe_str(lastname or ''),
authn: Renamed auth_container -> auth_headers
r59 'groups': [],
authentication: introduce a group sync flag for plugins....
r2495 'user_group_sync': False,
authn: Renamed auth_container -> auth_headers
r59 'email': email or '',
'admin': admin or False,
'active': active,
'active_from_extern': True,
'extern_name': username,
'extern_type': extern_type,
}
logging: expose extra metadata to various important logs for loki
r4816 log.info('user `%s` authenticated correctly', user_attrs['username'],
logging: changed module in logs extra as it is a reserved keyword
r4818 extra={"action": "user_auth_ok", "auth_module": "auth_headers", "username": user_attrs["username"]})
authn: Renamed auth_container -> auth_headers
r59 return user_attrs
core: change from homebrew plugin system into pyramid machinery....
r3240
def includeme(config):
authentication: run modernize for python3
r5094 plugin_id = f'egg:rhodecode-enterprise-ce#{RhodeCodeAuthPlugin.uid}'
core: change from homebrew plugin system into pyramid machinery....
r3240 plugin_factory(plugin_id).includeme(config)