auth_ldap.py
361 lines
| 13.8 KiB
| text/x-python
|
PythonLexer
Bradley M. Kuhn
|
r4116 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
rhodecode.lib.auth_modules.auth_ldap | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
RhodeCode authentication plugin for LDAP | ||||
:created_on: Created on Nov 17, 2010 | ||||
:author: marcink | ||||
:copyright: (c) 2013 RhodeCode GmbH. | ||||
:license: GPLv3, see LICENSE for more details. | ||||
""" | ||||
import logging | ||||
import traceback | ||||
from rhodecode.lib import auth_modules | ||||
from rhodecode.lib.compat import hybrid_property | ||||
from rhodecode.lib.utils2 import safe_unicode, safe_str | ||||
from rhodecode.lib.exceptions import ( | ||||
LdapConnectionError, LdapUsernameError, LdapPasswordError, LdapImportError | ||||
) | ||||
from rhodecode.model.db import User | ||||
log = logging.getLogger(__name__) | ||||
try: | ||||
import ldap | ||||
except ImportError: | ||||
# means that python-ldap is not installed | ||||
ldap = None | ||||
class AuthLdap(object): | ||||
def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='', | ||||
tls_kind='PLAIN', tls_reqcert='DEMAND', ldap_version=3, | ||||
ldap_filter='(&(objectClass=user)(!(objectClass=computer)))', | ||||
search_scope='SUBTREE', attr_login='uid'): | ||||
if ldap is None: | ||||
raise LdapImportError | ||||
self.ldap_version = ldap_version | ||||
ldap_server_type = 'ldap' | ||||
self.TLS_KIND = tls_kind | ||||
if self.TLS_KIND == 'LDAPS': | ||||
port = port or 689 | ||||
ldap_server_type = ldap_server_type + 's' | ||||
OPT_X_TLS_DEMAND = 2 | ||||
self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert, | ||||
OPT_X_TLS_DEMAND) | ||||
# split server into list | ||||
self.LDAP_SERVER_ADDRESS = server.split(',') | ||||
self.LDAP_SERVER_PORT = port | ||||
# USE FOR READ ONLY BIND TO LDAP SERVER | ||||
self.LDAP_BIND_DN = safe_str(bind_dn) | ||||
self.LDAP_BIND_PASS = safe_str(bind_pass) | ||||
_LDAP_SERVERS = [] | ||||
for host in self.LDAP_SERVER_ADDRESS: | ||||
_LDAP_SERVERS.append("%s://%s:%s" % (ldap_server_type, | ||||
host.replace(' ', ''), | ||||
self.LDAP_SERVER_PORT)) | ||||
self.LDAP_SERVER = str(', '.join(s for s in _LDAP_SERVERS)) | ||||
self.BASE_DN = safe_str(base_dn) | ||||
self.LDAP_FILTER = safe_str(ldap_filter) | ||||
self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope) | ||||
self.attr_login = attr_login | ||||
def authenticate_ldap(self, username, password): | ||||
""" | ||||
Authenticate a user via LDAP and return his/her LDAP properties. | ||||
Raises AuthenticationError if the credentials are rejected, or | ||||
EnvironmentError if the LDAP server can't be reached. | ||||
:param username: username | ||||
:param password: password | ||||
""" | ||||
from rhodecode.lib.helpers import chop_at | ||||
uid = chop_at(username, "@%s" % self.LDAP_SERVER_ADDRESS) | ||||
if not password: | ||||
log.debug("Attempt to authenticate LDAP user " | ||||
"with blank password rejected.") | ||||
raise LdapPasswordError() | ||||
if "," in username: | ||||
raise LdapUsernameError("invalid character in username: ,") | ||||
try: | ||||
if hasattr(ldap, 'OPT_X_TLS_CACERTDIR'): | ||||
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, | ||||
'/etc/openldap/cacerts') | ||||
ldap.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF) | ||||
ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON) | ||||
ldap.set_option(ldap.OPT_TIMEOUT, 20) | ||||
ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) | ||||
ldap.set_option(ldap.OPT_TIMELIMIT, 15) | ||||
if self.TLS_KIND != 'PLAIN': | ||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, self.TLS_REQCERT) | ||||
server = ldap.initialize(self.LDAP_SERVER) | ||||
if self.ldap_version == 2: | ||||
server.protocol = ldap.VERSION2 | ||||
else: | ||||
server.protocol = ldap.VERSION3 | ||||
if self.TLS_KIND == 'START_TLS': | ||||
server.start_tls_s() | ||||
if self.LDAP_BIND_DN and self.LDAP_BIND_PASS: | ||||
log.debug('Trying simple_bind with password and given DN: %s' | ||||
% self.LDAP_BIND_DN) | ||||
server.simple_bind_s(self.LDAP_BIND_DN, self.LDAP_BIND_PASS) | ||||
filter_ = '(&%s(%s=%s))' % (self.LDAP_FILTER, self.attr_login, | ||||
username) | ||||
log.debug("Authenticating %r filter %s at %s", self.BASE_DN, | ||||
filter_, self.LDAP_SERVER) | ||||
lobjects = server.search_ext_s(self.BASE_DN, self.SEARCH_SCOPE, | ||||
filter_) | ||||
if not lobjects: | ||||
raise ldap.NO_SUCH_OBJECT() | ||||
for (dn, _attrs) in lobjects: | ||||
if dn is None: | ||||
continue | ||||
try: | ||||
log.debug('Trying simple bind with %s' % dn) | ||||
server.simple_bind_s(dn, safe_str(password)) | ||||
attrs = server.search_ext_s(dn, ldap.SCOPE_BASE, | ||||
'(objectClass=*)')[0][1] | ||||
break | ||||
except ldap.INVALID_CREDENTIALS: | ||||
log.debug("LDAP rejected password for user '%s' (%s): %s" | ||||
% (uid, username, dn)) | ||||
else: | ||||
log.debug("No matching LDAP objects for authentication " | ||||
"of '%s' (%s)", uid, username) | ||||
raise LdapPasswordError() | ||||
except ldap.NO_SUCH_OBJECT: | ||||
log.debug("LDAP says no such user '%s' (%s)" % (uid, username)) | ||||
raise LdapUsernameError() | ||||
except ldap.SERVER_DOWN: | ||||
raise LdapConnectionError("LDAP can't access authentication server") | ||||
return dn, attrs | ||||
class RhodeCodeAuthPlugin(auth_modules.RhodeCodeExternalAuthPlugin): | ||||
def __init__(self): | ||||
self._logger = logging.getLogger(__name__) | ||||
self._tls_kind_values = ["PLAIN", "LDAPS", "START_TLS"] | ||||
self._tls_reqcert_values = ["NEVER", "ALLOW", "TRY", "DEMAND", "HARD"] | ||||
self._search_scopes = ["BASE", "ONELEVEL", "SUBTREE"] | ||||
@hybrid_property | ||||
def name(self): | ||||
return "ldap" | ||||
def settings(self): | ||||
settings = [ | ||||
{ | ||||
"name": "host", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "Host of the LDAP Server", | ||||
"formname": "LDAP Host" | ||||
}, | ||||
{ | ||||
"name": "port", | ||||
"validator": self.validators.Number(strip=True, not_empty=True), | ||||
"type": "string", | ||||
"description": "Port that the LDAP server is listening on", | ||||
"default": 389, | ||||
"formname": "Port" | ||||
}, | ||||
{ | ||||
"name": "dn_user", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "User to connect to LDAP", | ||||
"formname": "Account" | ||||
}, | ||||
{ | ||||
"name": "dn_pass", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "password", | ||||
"description": "Password to connect to LDAP", | ||||
"formname": "Password" | ||||
}, | ||||
{ | ||||
"name": "tls_kind", | ||||
"validator": self.validators.OneOf(self._tls_kind_values), | ||||
"type": "select", | ||||
"values": self._tls_kind_values, | ||||
"description": "TLS Type", | ||||
"default": 'PLAIN', | ||||
"formname": "Connection Security" | ||||
}, | ||||
{ | ||||
"name": "tls_reqcert", | ||||
"validator": self.validators.OneOf(self._tls_reqcert_values), | ||||
"type": "select", | ||||
"values": self._tls_reqcert_values, | ||||
"description": "Require Cert over TLS?", | ||||
"formname": "Certificate Checks" | ||||
}, | ||||
{ | ||||
"name": "base_dn", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "Base DN to search (e.g., dc=mydomain,dc=com)", | ||||
"formname": "Base DN" | ||||
}, | ||||
{ | ||||
"name": "filter", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "Filter to narrow results (e.g., ou=Users, etc)", | ||||
"formname": "LDAP Search Filter" | ||||
}, | ||||
{ | ||||
"name": "search_scope", | ||||
"validator": self.validators.OneOf(self._search_scopes), | ||||
"type": "select", | ||||
"values": self._search_scopes, | ||||
"description": "How deep to search LDAP", | ||||
"formname": "LDAP Search Scope" | ||||
}, | ||||
{ | ||||
"name": "attr_login", | ||||
"validator": self.validators.AttrLoginValidator(not_empty=True, strip=True), | ||||
"type": "string", | ||||
"description": "LDAP Attribute to map to user name", | ||||
"formname": "Login Attribute" | ||||
}, | ||||
{ | ||||
"name": "attr_firstname", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "LDAP Attribute to map to first name", | ||||
"formname": "First Name Attribute" | ||||
}, | ||||
{ | ||||
"name": "attr_lastname", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "LDAP Attribute to map to last name", | ||||
"formname": "Last Name Attribute" | ||||
}, | ||||
{ | ||||
"name": "attr_email", | ||||
"validator": self.validators.UnicodeString(strip=True), | ||||
"type": "string", | ||||
"description": "LDAP Attribute to map to email address", | ||||
"formname": "Email Attribute" | ||||
} | ||||
] | ||||
return settings | ||||
def use_fake_password(self): | ||||
return True | ||||
def user_activation_state(self): | ||||
def_user_perms = User.get_default_user().AuthUser.permissions['global'] | ||||
return 'hg.extern_activate.auto' in def_user_perms | ||||
def auth(self, userobj, username, password, settings, **kwargs): | ||||
""" | ||||
Given a user object (which may be null), username, a plaintext password, | ||||
and a settings object (containing all the keys needed as listed in settings()), | ||||
authenticate this user's login attempt. | ||||
Return None on failure. On success, return a dictionary of the form: | ||||
see: RhodeCodeAuthPluginBase.auth_func_attrs | ||||
This is later validated for correctness | ||||
""" | ||||
if not username or not password: | ||||
log.debug('Empty username or password skipping...') | ||||
return None | ||||
kwargs = { | ||||
'server': settings.get('host', ''), | ||||
'base_dn': settings.get('base_dn', ''), | ||||
'port': settings.get('port'), | ||||
'bind_dn': settings.get('dn_user'), | ||||
'bind_pass': settings.get('dn_pass'), | ||||
'tls_kind': settings.get('tls_kind'), | ||||
'tls_reqcert': settings.get('tls_reqcert'), | ||||
'ldap_filter': settings.get('filter'), | ||||
'search_scope': settings.get('search_scope'), | ||||
'attr_login': settings.get('attr_login'), | ||||
'ldap_version': 3, | ||||
} | ||||
if kwargs['bind_dn'] and not kwargs['bind_pass']: | ||||
log.debug('Using dynamic binding.') | ||||
kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username) | ||||
kwargs['bind_pass'] = password | ||||
log.debug('Checking for ldap authentication') | ||||
try: | ||||
aldap = AuthLdap(**kwargs) | ||||
(user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password) | ||||
log.debug('Got ldap DN response %s' % user_dn) | ||||
get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0] | ||||
# old attrs fetched from RhodeCode database | ||||
admin = getattr(userobj, 'admin', False) | ||||
active = getattr(userobj, 'active', True) | ||||
email = getattr(userobj, 'email', '') | ||||
firstname = getattr(userobj, 'firstname', '') | ||||
lastname = getattr(userobj, 'lastname', '') | ||||
extern_type = getattr(userobj, 'extern_type', '') | ||||
user_attrs = { | ||||
'username': username, | ||||
'firstname': safe_unicode(get_ldap_attr('attr_firstname') or firstname), | ||||
'lastname': safe_unicode(get_ldap_attr('attr_lastname') or lastname), | ||||
'groups': [], | ||||
'email': get_ldap_attr('attr_email' or email), | ||||
'admin': admin, | ||||
'active': active, | ||||
"active_from_extern": None, | ||||
'extern_name': user_dn, | ||||
'extern_type': extern_type, | ||||
} | ||||
log.info('user %s authenticated correctly' % user_attrs['username']) | ||||
return user_attrs | ||||
except (LdapUsernameError, LdapPasswordError, LdapImportError): | ||||
log.error(traceback.format_exc()) | ||||
return None | ||||
except (Exception,): | ||||
log.error(traceback.format_exc()) | ||||
return None | ||||