auth_token.py
175 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
r79 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
RhodeCode authentication token plugin for built in internal auth | ||||
""" | ||||
import logging | ||||
r3392 | import colander | |||
r79 | ||||
r3392 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |||
r79 | from rhodecode.translation import _ | |||
r1454 | from rhodecode.authentication.base import ( | |||
RhodeCodeAuthPluginBase, VCS_TYPE, hybrid_property) | ||||
r79 | from rhodecode.authentication.routes import AuthnPluginResourceBase | |||
r1510 | from rhodecode.model.db import User, UserApiKeys, Repository | |||
r79 | ||||
log = logging.getLogger(__name__) | ||||
r3253 | def plugin_factory(plugin_id, *args, **kwargs): | |||
r79 | plugin = RhodeCodeAuthPlugin(plugin_id) | |||
return plugin | ||||
class RhodecodeAuthnResource(AuthnPluginResourceBase): | ||||
pass | ||||
class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase): | ||||
""" | ||||
Enables usage of authentication tokens for vcs operations. | ||||
""" | ||||
r3246 | uid = 'token' | |||
r3392 | AUTH_RESTRICTION_SCOPE_VCS = 'scope_vcs' | |||
r79 | ||||
def includeme(self, config): | ||||
config.add_authn_plugin(self) | ||||
config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self)) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_get', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r79 | request_method='GET', | |||
route_name='auth_home', | ||||
context=RhodecodeAuthnResource) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_post', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r79 | request_method='POST', | |||
route_name='auth_home', | ||||
context=RhodecodeAuthnResource) | ||||
r3392 | def get_settings_schema(self): | |||
return RhodeCodeSettingsSchema() | ||||
r4545 | def get_display_name(self, load_from_settings=False): | |||
r3234 | return _('Rhodecode Token') | |||
r79 | ||||
r3232 | @classmethod | |||
def docs(cls): | ||||
return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-token.html" | ||||
r79 | @hybrid_property | |||
def name(self): | ||||
r5094 | return "authtoken" | |||
r79 | ||||
def user_activation_state(self): | ||||
r1997 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] | |||
r79 | return 'hg.register.auto_activate' in def_user_perms | |||
def allows_authentication_from( | ||||
self, user, allows_non_existing_user=True, | ||||
allowed_auth_plugins=None, allowed_auth_sources=None): | ||||
""" | ||||
Custom method for this auth that doesn't accept empty users. And also | ||||
r440 | allows users from all other active plugins to use it and also | |||
authenticate against it. But only via vcs mode | ||||
r79 | """ | |||
r440 | from rhodecode.authentication.base import get_authn_registry | |||
authn_registry = get_authn_registry() | ||||
active_plugins = set( | ||||
[x.name for x in authn_registry.get_plugins_for_authentication()]) | ||||
active_plugins.discard(self.name) | ||||
allowed_auth_plugins = [self.name] + list(active_plugins) | ||||
r79 | # only for vcs operations | |||
allowed_auth_sources = [VCS_TYPE] | ||||
r5094 | return super().allows_authentication_from( | |||
r79 | user, allows_non_existing_user=False, | |||
allowed_auth_plugins=allowed_auth_plugins, | ||||
allowed_auth_sources=allowed_auth_sources) | ||||
def auth(self, userobj, username, password, settings, **kwargs): | ||||
if not userobj: | ||||
r3061 | log.debug('userobj was:%s skipping', userobj) | |||
r79 | return None | |||
user_attrs = { | ||||
"username": userobj.username, | ||||
"firstname": userobj.firstname, | ||||
"lastname": userobj.lastname, | ||||
"groups": [], | ||||
r2495 | 'user_group_sync': False, | |||
r79 | "email": userobj.email, | |||
"admin": userobj.admin, | ||||
"active": userobj.active, | ||||
"active_from_extern": userobj.active, | ||||
"extern_name": userobj.user_id, | ||||
"extern_type": userobj.extern_type, | ||||
} | ||||
log.debug('Authenticating user with args %s', user_attrs) | ||||
if userobj.active: | ||||
r1510 | # calling context repo for token scopes | |||
scope_repo_id = None | ||||
if self.acl_repo_name: | ||||
repo = Repository.get_by_repo_name(self.acl_repo_name) | ||||
scope_repo_id = repo.repo_id if repo else None | ||||
r1421 | token_match = userobj.authenticate_by_token( | |||
r1510 | password, roles=[UserApiKeys.ROLE_VCS], | |||
scope_repo_id=scope_repo_id) | ||||
r1421 | ||||
if userobj.username == username and token_match: | ||||
r79 | log.info( | |||
'user `%s` successfully authenticated via %s', | ||||
user_attrs['username'], self.name) | ||||
return user_attrs | ||||
r3392 | log.warning('user `%s` failed to authenticate via %s, reason: bad or ' | |||
'inactive token.', username, self.name) | ||||
r79 | else: | |||
r3392 | log.warning('user `%s` failed to authenticate via %s, reason: account not ' | |||
'active.', username, self.name) | ||||
r79 | return None | |||
r3240 | ||||
def includeme(config): | ||||
r5094 | plugin_id = f'egg:rhodecode-enterprise-ce#{RhodeCodeAuthPlugin.uid}' | |||
r3240 | plugin_factory(plugin_id).includeme(config) | |||
r3392 | ||||
class RhodeCodeSettingsSchema(AuthnPluginSettingsSchemaBase): | ||||
auth_scope_choices = [ | ||||
(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS, 'VCS only'), | ||||
] | ||||
scope_restriction = colander.SchemaNode( | ||||
colander.String(), | ||||
default=auth_scope_choices[0], | ||||
description=_('Choose operation scope restriction when authenticating.'), | ||||
title=_('Scope restriction'), | ||||
validator=colander.OneOf([x[0] for x in auth_scope_choices]), | ||||
widget='select_with_labels', | ||||
choices=auth_scope_choices | ||||
) | ||||