##// END OF EJS Templates
fix(caching): fixed problems with Cache query for users....
fix(caching): fixed problems with Cache query for users. The old way of querying caused the user get query to be always cached, and returning old results even in 2fa forms. The new limited query doesn't cache the user object resolving issues

File last commit:

r5094:71df309f default
r5365:ae8a165b default
Show More
auth_crowd.py
295 lines | 10.8 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2012-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
RhodeCode authentication plugin for Atlassian CROWD
"""
import colander
import base64
import logging
authentication: fixed for python3 migrations
r5057 import urllib.request
import urllib.error
import urllib.parse
project: added all source files and assets
r1
auth: refactor code and simplified instructions....
r1454 from rhodecode.translation import _
from rhodecode.authentication.base import (
RhodeCodeExternalAuthPlugin, hybrid_property)
project: added all source files and assets
r1 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
from rhodecode.authentication.routes import AuthnPluginResourceBase
authn: Add whitespace stripping to authentication plugin settings.
r55 from rhodecode.lib.colander_utils import strip_whitespace
project: added all source files and assets
r1 from rhodecode.lib.ext_json import json, formatted_json
from rhodecode.model.db import User
log = logging.getLogger(__name__)
auth-plugins: some code cleanup + added docs for main plugin.
r3253 def plugin_factory(plugin_id, *args, **kwargs):
project: added all source files and assets
r1 """
Factory function that is called during plugin discovery.
It returns the plugin instance.
"""
plugin = RhodeCodeAuthPlugin(plugin_id)
return plugin
class CrowdAuthnResource(AuthnPluginResourceBase):
pass
class CrowdSettingsSchema(AuthnPluginSettingsSchemaBase):
host = colander.SchemaNode(
colander.String(),
default='127.0.0.1',
description=_('The FQDN or IP of the Atlassian CROWD Server'),
authn: Add whitespace stripping to authentication plugin settings.
r55 preparer=strip_whitespace,
project: added all source files and assets
r1 title=_('Host'),
widget='string')
port = colander.SchemaNode(
colander.Int(),
default=8095,
description=_('The Port in use by the Atlassian CROWD Server'),
authn: Add whitespace stripping to authentication plugin settings.
r55 preparer=strip_whitespace,
project: added all source files and assets
r1 title=_('Port'),
validator=colander.Range(min=0, max=65536),
widget='int')
app_name = colander.SchemaNode(
colander.String(),
default='',
description=_('The Application Name to authenticate to CROWD'),
authn: Add whitespace stripping to authentication plugin settings.
r55 preparer=strip_whitespace,
project: added all source files and assets
r1 title=_('Application Name'),
widget='string')
app_password = colander.SchemaNode(
colander.String(),
default='',
description=_('The password to authenticate to CROWD'),
authn: Add whitespace stripping to authentication plugin settings.
r55 preparer=strip_whitespace,
project: added all source files and assets
r1 title=_('Application Password'),
widget='password')
admin_groups = colander.SchemaNode(
colander.String(),
default='',
description=_('A comma separated list of group names that identify '
'users as RhodeCode Administrators'),
missing='',
authn: Add whitespace stripping to authentication plugin settings.
r55 preparer=strip_whitespace,
project: added all source files and assets
r1 title=_('Admin Groups'),
widget='string')
class CrowdServer(object):
def __init__(self, *args, **kwargs):
"""
Create a new CrowdServer object that points to IP/Address 'host',
on the given port, and using the given method (https/http). user and
passwd can be set here or with set_credentials. If unspecified,
"version" defaults to "latest".
example::
cserver = CrowdServer(host="127.0.0.1",
port="8095",
user="some_app",
passwd="some_passwd",
version="1")
"""
lint: ruff run
r5089 if 'port' not in kwargs:
project: added all source files and assets
r1 kwargs["port"] = "8095"
self._logger = kwargs.get("logger", logging.getLogger(__name__))
self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"),
kwargs.get("host", "127.0.0.1"),
kwargs.get("port", "8095"))
self.set_credentials(kwargs.get("user", ""),
kwargs.get("passwd", ""))
self._version = kwargs.get("version", "latest")
self._url_list = None
self._appname = "crowd"
def set_credentials(self, user, passwd):
self.user = user
self.passwd = passwd
self._make_opener()
def _make_opener(self):
python3: fix urllib usage
r4914 mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
project: added all source files and assets
r1 mgr.add_password(None, self._uri, self.user, self.passwd)
python3: fix urllib usage
r4914 handler = urllib.request.HTTPBasicAuthHandler(mgr)
self.opener = urllib.request.build_opener(handler)
project: added all source files and assets
r1
def _request(self, url, body=None, headers=None,
method=None, noformat=False,
empty_response_ok=False):
_headers = {"Content-type": "application/json",
"Accept": "application/json"}
if self.user and self.passwd:
authentication: run modernize for python3
r5094 authstring = base64.b64encode("{}:{}".format(self.user, self.passwd))
project: added all source files and assets
r1 _headers["Authorization"] = "Basic %s" % authstring
if headers:
_headers.update(headers)
log.debug("Sent crowd: \n%s"
% (formatted_json({"url": url, "body": body,
"headers": _headers})))
python3: fix urllib usage
r4914 request = urllib.request.Request(url, body, _headers)
project: added all source files and assets
r1 if method:
request.get_method = lambda: method
global msg
msg = ""
try:
dan
auth-crawd: py3 compat
r4350 ret_doc = self.opener.open(request)
msg = ret_doc.read()
project: added all source files and assets
r1 if not msg and empty_response_ok:
dan
auth-crawd: py3 compat
r4350 ret_val = {}
ret_val["status"] = True
ret_val["error"] = "Response body was empty"
project: added all source files and assets
r1 elif not noformat:
dan
auth-crawd: py3 compat
r4350 ret_val = json.loads(msg)
ret_val["status"] = True
project: added all source files and assets
r1 else:
dan
auth-crawd: py3 compat
r4350 ret_val = msg
project: added all source files and assets
r1 except Exception as e:
if not noformat:
dan
auth-crawd: py3 compat
r4350 ret_val = {"status": False,
"body": body,
authentication: run modernize for python3
r5094 "error": f"{e}\n{msg}"}
project: added all source files and assets
r1 else:
dan
auth-crawd: py3 compat
r4350 ret_val = None
return ret_val
project: added all source files and assets
r1
def user_auth(self, username, password):
"""Authenticate a user against crowd. Returns brief information about
the user."""
url = ("%s/rest/usermanagement/%s/authentication?username=%s"
% (self._uri, self._version, username))
body = json.dumps({"value": password})
return self._request(url, body)
def user_groups(self, username):
"""Retrieve a list of groups to which this user belongs."""
url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s"
% (self._uri, self._version, username))
return self._request(url)
class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
authentication: use registerd UID for plugin definition for more consistent loading of auth plugins.
r3246 uid = 'crowd'
auth-plugins: define unsafe settings
r1623 _settings_unsafe_keys = ['app_password']
project: added all source files and assets
r1
def includeme(self, config):
config.add_authn_plugin(self)
config.add_authn_resource(self.get_id(), CrowdAuthnResource(self))
config.add_view(
'rhodecode.authentication.views.AuthnPluginViewBase',
attr='settings_get',
templating: use .mako as extensions for template files.
r1282 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
project: added all source files and assets
r1 request_method='GET',
route_name='auth_home',
context=CrowdAuthnResource)
config.add_view(
'rhodecode.authentication.views.AuthnPluginViewBase',
attr='settings_post',
templating: use .mako as extensions for template files.
r1282 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
project: added all source files and assets
r1 request_method='POST',
route_name='auth_home',
context=CrowdAuthnResource)
def get_settings_schema(self):
return CrowdSettingsSchema()
auth: allow custom name for plugins if defined in the settings.
r4545 def get_display_name(self, load_from_settings=False):
project: added all source files and assets
r1 return _('CROWD')
auth-plugins: expose docs and icon methods for authentication.
r3232 @classmethod
def docs(cls):
return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-crowd.html"
project: added all source files and assets
r1 @hybrid_property
def name(self):
authentication: run modernize for python3
r5094 return "crowd"
project: added all source files and assets
r1
def use_fake_password(self):
return True
def user_activation_state(self):
users: make AuthUser propert a method, and allow override of params.
r1997 def_user_perms = User.get_default_user().AuthUser().permissions['global']
project: added all source files and assets
r1 return 'hg.extern_activate.auto' in def_user_perms
def auth(self, userobj, username, password, settings, **kwargs):
"""
Given a user object (which may be null), username, a plaintext password,
and a settings object (containing all the keys needed as listed in settings()),
authenticate this user's login attempt.
Return None on failure. On success, return a dictionary of the form:
see: RhodeCodeAuthPluginBase.auth_func_attrs
This is later validated for correctness
"""
if not username or not password:
log.debug('Empty username or password skipping...')
return None
logging: use lazy parameter evaluation in log calls.
r3061 log.debug("Crowd settings: \n%s", formatted_json(settings))
project: added all source files and assets
r1 server = CrowdServer(**settings)
server.set_credentials(settings["app_name"], settings["app_password"])
crowd_user = server.user_auth(username, password)
logging: use lazy parameter evaluation in log calls.
r3061 log.debug("Crowd returned: \n%s", formatted_json(crowd_user))
project: added all source files and assets
r1 if not crowd_user["status"]:
return None
res = server.user_groups(crowd_user["name"])
logging: use lazy parameter evaluation in log calls.
r3061 log.debug("Crowd groups: \n%s", formatted_json(res))
project: added all source files and assets
r1 crowd_user["groups"] = [x["name"] for x in res["groups"]]
# old attrs fetched from RhodeCode database
admin = getattr(userobj, 'admin', False)
active = getattr(userobj, 'active', True)
email = getattr(userobj, 'email', '')
username = getattr(userobj, 'username', username)
firstname = getattr(userobj, 'firstname', '')
lastname = getattr(userobj, 'lastname', '')
extern_type = getattr(userobj, 'extern_type', '')
user_attrs = {
'username': username,
'firstname': crowd_user["first-name"] or firstname,
'lastname': crowd_user["last-name"] or lastname,
'groups': crowd_user["groups"],
authentication: introduce a group sync flag for plugins....
r2495 'user_group_sync': True,
project: added all source files and assets
r1 'email': crowd_user["email"] or email,
'admin': admin,
'active': active,
'active_from_extern': crowd_user.get('active'),
'extern_name': crowd_user["name"],
'extern_type': extern_type,
}
# set an admin if we're in admin_groups of crowd
for group in settings["admin_groups"]:
if group in user_attrs["groups"]:
user_attrs["admin"] = True
logging: use lazy parameter evaluation in log calls.
r3061 log.debug("Final crowd user object: \n%s", formatted_json(user_attrs))
log.info('user `%s` authenticated correctly', user_attrs['username'])
project: added all source files and assets
r1 return user_attrs
core: change from homebrew plugin system into pyramid machinery....
r3240
def includeme(config):
authentication: run modernize for python3
r5094 plugin_id = f'egg:rhodecode-enterprise-ce#{RhodeCodeAuthPlugin.uid}'
core: change from homebrew plugin system into pyramid machinery....
r3240 plugin_factory(plugin_id).includeme(config)