auth_crowd.py
297 lines
| 10.8 KiB
| text/x-python
|
PythonLexer
r5056 | ||||
r1 | ||||
r5088 | # Copyright (C) 2012-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
RhodeCode authentication plugin for Atlassian CROWD | ||||
""" | ||||
import colander | ||||
import base64 | ||||
import logging | ||||
r5057 | import urllib.request | |||
import urllib.error | ||||
import urllib.parse | ||||
r1 | ||||
r1454 | from rhodecode.translation import _ | |||
from rhodecode.authentication.base import ( | ||||
RhodeCodeExternalAuthPlugin, hybrid_property) | ||||
r1 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |||
from rhodecode.authentication.routes import AuthnPluginResourceBase | ||||
r55 | from rhodecode.lib.colander_utils import strip_whitespace | |||
r1 | from rhodecode.lib.ext_json import json, formatted_json | |||
from rhodecode.model.db import User | ||||
log = logging.getLogger(__name__) | ||||
r3253 | def plugin_factory(plugin_id, *args, **kwargs): | |||
r1 | """ | |||
Factory function that is called during plugin discovery. | ||||
It returns the plugin instance. | ||||
""" | ||||
plugin = RhodeCodeAuthPlugin(plugin_id) | ||||
return plugin | ||||
class CrowdAuthnResource(AuthnPluginResourceBase): | ||||
pass | ||||
class CrowdSettingsSchema(AuthnPluginSettingsSchemaBase): | ||||
host = colander.SchemaNode( | ||||
colander.String(), | ||||
default='127.0.0.1', | ||||
description=_('The FQDN or IP of the Atlassian CROWD Server'), | ||||
r55 | preparer=strip_whitespace, | |||
r1 | title=_('Host'), | |||
widget='string') | ||||
port = colander.SchemaNode( | ||||
colander.Int(), | ||||
default=8095, | ||||
description=_('The Port in use by the Atlassian CROWD Server'), | ||||
r55 | preparer=strip_whitespace, | |||
r1 | title=_('Port'), | |||
validator=colander.Range(min=0, max=65536), | ||||
widget='int') | ||||
app_name = colander.SchemaNode( | ||||
colander.String(), | ||||
default='', | ||||
description=_('The Application Name to authenticate to CROWD'), | ||||
r55 | preparer=strip_whitespace, | |||
r1 | title=_('Application Name'), | |||
widget='string') | ||||
app_password = colander.SchemaNode( | ||||
colander.String(), | ||||
default='', | ||||
description=_('The password to authenticate to CROWD'), | ||||
r55 | preparer=strip_whitespace, | |||
r1 | title=_('Application Password'), | |||
widget='password') | ||||
admin_groups = colander.SchemaNode( | ||||
colander.String(), | ||||
default='', | ||||
description=_('A comma separated list of group names that identify ' | ||||
'users as RhodeCode Administrators'), | ||||
missing='', | ||||
r55 | preparer=strip_whitespace, | |||
r1 | title=_('Admin Groups'), | |||
widget='string') | ||||
class CrowdServer(object): | ||||
def __init__(self, *args, **kwargs): | ||||
""" | ||||
Create a new CrowdServer object that points to IP/Address 'host', | ||||
on the given port, and using the given method (https/http). user and | ||||
passwd can be set here or with set_credentials. If unspecified, | ||||
"version" defaults to "latest". | ||||
example:: | ||||
cserver = CrowdServer(host="127.0.0.1", | ||||
port="8095", | ||||
user="some_app", | ||||
passwd="some_passwd", | ||||
version="1") | ||||
""" | ||||
r5089 | if 'port' not in kwargs: | |||
r1 | kwargs["port"] = "8095" | |||
self._logger = kwargs.get("logger", logging.getLogger(__name__)) | ||||
self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"), | ||||
kwargs.get("host", "127.0.0.1"), | ||||
kwargs.get("port", "8095")) | ||||
self.set_credentials(kwargs.get("user", ""), | ||||
kwargs.get("passwd", "")) | ||||
self._version = kwargs.get("version", "latest") | ||||
self._url_list = None | ||||
self._appname = "crowd" | ||||
def set_credentials(self, user, passwd): | ||||
self.user = user | ||||
self.passwd = passwd | ||||
self._make_opener() | ||||
def _make_opener(self): | ||||
r4914 | mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm() | |||
r1 | mgr.add_password(None, self._uri, self.user, self.passwd) | |||
r4914 | handler = urllib.request.HTTPBasicAuthHandler(mgr) | |||
self.opener = urllib.request.build_opener(handler) | ||||
r1 | ||||
def _request(self, url, body=None, headers=None, | ||||
method=None, noformat=False, | ||||
empty_response_ok=False): | ||||
_headers = {"Content-type": "application/json", | ||||
"Accept": "application/json"} | ||||
if self.user and self.passwd: | ||||
authstring = base64.b64encode("%s:%s" % (self.user, self.passwd)) | ||||
_headers["Authorization"] = "Basic %s" % authstring | ||||
if headers: | ||||
_headers.update(headers) | ||||
log.debug("Sent crowd: \n%s" | ||||
% (formatted_json({"url": url, "body": body, | ||||
"headers": _headers}))) | ||||
r4914 | request = urllib.request.Request(url, body, _headers) | |||
r1 | if method: | |||
request.get_method = lambda: method | ||||
global msg | ||||
msg = "" | ||||
try: | ||||
r4350 | ret_doc = self.opener.open(request) | |||
msg = ret_doc.read() | ||||
r1 | if not msg and empty_response_ok: | |||
r4350 | ret_val = {} | |||
ret_val["status"] = True | ||||
ret_val["error"] = "Response body was empty" | ||||
r1 | elif not noformat: | |||
r4350 | ret_val = json.loads(msg) | |||
ret_val["status"] = True | ||||
r1 | else: | |||
r4350 | ret_val = msg | |||
r1 | except Exception as e: | |||
if not noformat: | ||||
r4350 | ret_val = {"status": False, | |||
"body": body, | ||||
"error": "{}\n{}".format(e, msg)} | ||||
r1 | else: | |||
r4350 | ret_val = None | |||
return ret_val | ||||
r1 | ||||
def user_auth(self, username, password): | ||||
"""Authenticate a user against crowd. Returns brief information about | ||||
the user.""" | ||||
url = ("%s/rest/usermanagement/%s/authentication?username=%s" | ||||
% (self._uri, self._version, username)) | ||||
body = json.dumps({"value": password}) | ||||
return self._request(url, body) | ||||
def user_groups(self, username): | ||||
"""Retrieve a list of groups to which this user belongs.""" | ||||
url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s" | ||||
% (self._uri, self._version, username)) | ||||
return self._request(url) | ||||
class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin): | ||||
r3246 | uid = 'crowd' | |||
r1623 | _settings_unsafe_keys = ['app_password'] | |||
r1 | ||||
def includeme(self, config): | ||||
config.add_authn_plugin(self) | ||||
config.add_authn_resource(self.get_id(), CrowdAuthnResource(self)) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_get', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r1 | request_method='GET', | |||
route_name='auth_home', | ||||
context=CrowdAuthnResource) | ||||
config.add_view( | ||||
'rhodecode.authentication.views.AuthnPluginViewBase', | ||||
attr='settings_post', | ||||
r1282 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', | |||
r1 | request_method='POST', | |||
route_name='auth_home', | ||||
context=CrowdAuthnResource) | ||||
def get_settings_schema(self): | ||||
return CrowdSettingsSchema() | ||||
r4545 | def get_display_name(self, load_from_settings=False): | |||
r1 | return _('CROWD') | |||
r3232 | @classmethod | |||
def docs(cls): | ||||
return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-crowd.html" | ||||
r1 | @hybrid_property | |||
def name(self): | ||||
r3256 | return u"crowd" | |||
r1 | ||||
def use_fake_password(self): | ||||
return True | ||||
def user_activation_state(self): | ||||
r1997 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] | |||
r1 | return 'hg.extern_activate.auto' in def_user_perms | |||
def auth(self, userobj, username, password, settings, **kwargs): | ||||
""" | ||||
Given a user object (which may be null), username, a plaintext password, | ||||
and a settings object (containing all the keys needed as listed in settings()), | ||||
authenticate this user's login attempt. | ||||
Return None on failure. On success, return a dictionary of the form: | ||||
see: RhodeCodeAuthPluginBase.auth_func_attrs | ||||
This is later validated for correctness | ||||
""" | ||||
if not username or not password: | ||||
log.debug('Empty username or password skipping...') | ||||
return None | ||||
r3061 | log.debug("Crowd settings: \n%s", formatted_json(settings)) | |||
r1 | server = CrowdServer(**settings) | |||
server.set_credentials(settings["app_name"], settings["app_password"]) | ||||
crowd_user = server.user_auth(username, password) | ||||
r3061 | log.debug("Crowd returned: \n%s", formatted_json(crowd_user)) | |||
r1 | if not crowd_user["status"]: | |||
return None | ||||
res = server.user_groups(crowd_user["name"]) | ||||
r3061 | log.debug("Crowd groups: \n%s", formatted_json(res)) | |||
r1 | crowd_user["groups"] = [x["name"] for x in res["groups"]] | |||
# old attrs fetched from RhodeCode database | ||||
admin = getattr(userobj, 'admin', False) | ||||
active = getattr(userobj, 'active', True) | ||||
email = getattr(userobj, 'email', '') | ||||
username = getattr(userobj, 'username', username) | ||||
firstname = getattr(userobj, 'firstname', '') | ||||
lastname = getattr(userobj, 'lastname', '') | ||||
extern_type = getattr(userobj, 'extern_type', '') | ||||
user_attrs = { | ||||
'username': username, | ||||
'firstname': crowd_user["first-name"] or firstname, | ||||
'lastname': crowd_user["last-name"] or lastname, | ||||
'groups': crowd_user["groups"], | ||||
r2495 | 'user_group_sync': True, | |||
r1 | 'email': crowd_user["email"] or email, | |||
'admin': admin, | ||||
'active': active, | ||||
'active_from_extern': crowd_user.get('active'), | ||||
'extern_name': crowd_user["name"], | ||||
'extern_type': extern_type, | ||||
} | ||||
# set an admin if we're in admin_groups of crowd | ||||
for group in settings["admin_groups"]: | ||||
if group in user_attrs["groups"]: | ||||
user_attrs["admin"] = True | ||||
r3061 | log.debug("Final crowd user object: \n%s", formatted_json(user_attrs)) | |||
log.info('user `%s` authenticated correctly', user_attrs['username']) | ||||
r1 | return user_attrs | |||
r3240 | ||||
def includeme(config): | ||||
r3246 | plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid) | |||
r3240 | plugin_factory(plugin_id).includeme(config) | |||