auth_token.py
147 lines
| 5.3 KiB
| text/x-python
|
PythonLexer
r79 | # -*- coding: utf-8 -*- | |||
r2487 | # Copyright (C) 2016-2018 RhodeCode GmbH | |||
r79 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
RhodeCode authentication token plugin for built in internal auth | ||||
""" | ||||
import logging | ||||
from rhodecode.translation import _ | ||||
r1454 | from rhodecode.authentication.base import ( | |||
RhodeCodeAuthPluginBase, VCS_TYPE, hybrid_property) | ||||
r79 | from rhodecode.authentication.routes import AuthnPluginResourceBase | |||
r1510 | from rhodecode.model.db import User, UserApiKeys, Repository | |||
r79 | ||||
log = logging.getLogger(__name__) | ||||
def plugin_factory(plugin_id, *args, **kwds): | ||||
plugin = RhodeCodeAuthPlugin(plugin_id) | ||||
return plugin | ||||
class RhodecodeAuthnResource(AuthnPluginResourceBase): | ||||
pass | ||||
class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase): | ||||
""" | ||||
Enables usage of authentication tokens for vcs operations. | ||||
""" | ||||
def includeme(self, config): | ||||
config.add_authn_plugin(self) | ||||
config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self)) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_get', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r79 | request_method='GET', | |||
route_name='auth_home', | ||||
context=RhodecodeAuthnResource) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_post', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r79 | request_method='POST', | |||
route_name='auth_home', | ||||
context=RhodecodeAuthnResource) | ||||
def get_display_name(self): | ||||
return _('Rhodecode Token Auth') | ||||
@hybrid_property | ||||
def name(self): | ||||
return "authtoken" | ||||
def user_activation_state(self): | ||||
r1997 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] | |||
r79 | return 'hg.register.auto_activate' in def_user_perms | |||
def allows_authentication_from( | ||||
self, user, allows_non_existing_user=True, | ||||
allowed_auth_plugins=None, allowed_auth_sources=None): | ||||
""" | ||||
Custom method for this auth that doesn't accept empty users. And also | ||||
r440 | allows users from all other active plugins to use it and also | |||
authenticate against it. But only via vcs mode | ||||
r79 | """ | |||
r440 | from rhodecode.authentication.base import get_authn_registry | |||
authn_registry = get_authn_registry() | ||||
active_plugins = set( | ||||
[x.name for x in authn_registry.get_plugins_for_authentication()]) | ||||
active_plugins.discard(self.name) | ||||
allowed_auth_plugins = [self.name] + list(active_plugins) | ||||
r79 | # only for vcs operations | |||
allowed_auth_sources = [VCS_TYPE] | ||||
return super(RhodeCodeAuthPlugin, self).allows_authentication_from( | ||||
user, allows_non_existing_user=False, | ||||
allowed_auth_plugins=allowed_auth_plugins, | ||||
allowed_auth_sources=allowed_auth_sources) | ||||
def auth(self, userobj, username, password, settings, **kwargs): | ||||
if not userobj: | ||||
r3061 | log.debug('userobj was:%s skipping', userobj) | |||
r79 | return None | |||
user_attrs = { | ||||
"username": userobj.username, | ||||
"firstname": userobj.firstname, | ||||
"lastname": userobj.lastname, | ||||
"groups": [], | ||||
r2495 | 'user_group_sync': False, | |||
r79 | "email": userobj.email, | |||
"admin": userobj.admin, | ||||
"active": userobj.active, | ||||
"active_from_extern": userobj.active, | ||||
"extern_name": userobj.user_id, | ||||
"extern_type": userobj.extern_type, | ||||
} | ||||
log.debug('Authenticating user with args %s', user_attrs) | ||||
if userobj.active: | ||||
r1510 | # calling context repo for token scopes | |||
scope_repo_id = None | ||||
if self.acl_repo_name: | ||||
repo = Repository.get_by_repo_name(self.acl_repo_name) | ||||
scope_repo_id = repo.repo_id if repo else None | ||||
r1421 | token_match = userobj.authenticate_by_token( | |||
r1510 | password, roles=[UserApiKeys.ROLE_VCS], | |||
scope_repo_id=scope_repo_id) | ||||
r1421 | ||||
if userobj.username == username and token_match: | ||||
r79 | log.info( | |||
'user `%s` successfully authenticated via %s', | ||||
user_attrs['username'], self.name) | ||||
return user_attrs | ||||
r2679 | log.warn( | |||
r79 | 'user `%s` failed to authenticate via %s, reason: bad or ' | |||
'inactive token.', username, self.name) | ||||
else: | ||||
log.warning( | ||||
'user `%s` failed to authenticate via %s, reason: account not ' | ||||
'active.', username, self.name) | ||||
return None | ||||