##// END OF EJS Templates
Change project URL - use kallithea-scm.org
Change project URL - use kallithea-scm.org

File last commit:

r4116:ffd45b18 rhodecode-2.2.5-gpl
r4184:48ad8455 kallithea-2.2.5-r...
Show More
auth_ldap.py
361 lines | 13.8 KiB | text/x-python | PythonLexer
Bradley M. Kuhn
Imported some of the GPLv3'd changes from RhodeCode v2.2.5....
r4116 # -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
rhodecode.lib.auth_modules.auth_ldap
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RhodeCode authentication plugin for LDAP
:created_on: Created on Nov 17, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH.
:license: GPLv3, see LICENSE for more details.
"""
import logging
import traceback
from rhodecode.lib import auth_modules
from rhodecode.lib.compat import hybrid_property
from rhodecode.lib.utils2 import safe_unicode, safe_str
from rhodecode.lib.exceptions import (
LdapConnectionError, LdapUsernameError, LdapPasswordError, LdapImportError
)
from rhodecode.model.db import User
log = logging.getLogger(__name__)
try:
import ldap
except ImportError:
# means that python-ldap is not installed
ldap = None
class AuthLdap(object):
def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='',
tls_kind='PLAIN', tls_reqcert='DEMAND', ldap_version=3,
ldap_filter='(&(objectClass=user)(!(objectClass=computer)))',
search_scope='SUBTREE', attr_login='uid'):
if ldap is None:
raise LdapImportError
self.ldap_version = ldap_version
ldap_server_type = 'ldap'
self.TLS_KIND = tls_kind
if self.TLS_KIND == 'LDAPS':
port = port or 689
ldap_server_type = ldap_server_type + 's'
OPT_X_TLS_DEMAND = 2
self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert,
OPT_X_TLS_DEMAND)
# split server into list
self.LDAP_SERVER_ADDRESS = server.split(',')
self.LDAP_SERVER_PORT = port
# USE FOR READ ONLY BIND TO LDAP SERVER
self.LDAP_BIND_DN = safe_str(bind_dn)
self.LDAP_BIND_PASS = safe_str(bind_pass)
_LDAP_SERVERS = []
for host in self.LDAP_SERVER_ADDRESS:
_LDAP_SERVERS.append("%s://%s:%s" % (ldap_server_type,
host.replace(' ', ''),
self.LDAP_SERVER_PORT))
self.LDAP_SERVER = str(', '.join(s for s in _LDAP_SERVERS))
self.BASE_DN = safe_str(base_dn)
self.LDAP_FILTER = safe_str(ldap_filter)
self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope)
self.attr_login = attr_login
def authenticate_ldap(self, username, password):
"""
Authenticate a user via LDAP and return his/her LDAP properties.
Raises AuthenticationError if the credentials are rejected, or
EnvironmentError if the LDAP server can't be reached.
:param username: username
:param password: password
"""
from rhodecode.lib.helpers import chop_at
uid = chop_at(username, "@%s" % self.LDAP_SERVER_ADDRESS)
if not password:
log.debug("Attempt to authenticate LDAP user "
"with blank password rejected.")
raise LdapPasswordError()
if "," in username:
raise LdapUsernameError("invalid character in username: ,")
try:
if hasattr(ldap, 'OPT_X_TLS_CACERTDIR'):
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR,
'/etc/openldap/cacerts')
ldap.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON)
ldap.set_option(ldap.OPT_TIMEOUT, 20)
ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
ldap.set_option(ldap.OPT_TIMELIMIT, 15)
if self.TLS_KIND != 'PLAIN':
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, self.TLS_REQCERT)
server = ldap.initialize(self.LDAP_SERVER)
if self.ldap_version == 2:
server.protocol = ldap.VERSION2
else:
server.protocol = ldap.VERSION3
if self.TLS_KIND == 'START_TLS':
server.start_tls_s()
if self.LDAP_BIND_DN and self.LDAP_BIND_PASS:
log.debug('Trying simple_bind with password and given DN: %s'
% self.LDAP_BIND_DN)
server.simple_bind_s(self.LDAP_BIND_DN, self.LDAP_BIND_PASS)
filter_ = '(&%s(%s=%s))' % (self.LDAP_FILTER, self.attr_login,
username)
log.debug("Authenticating %r filter %s at %s", self.BASE_DN,
filter_, self.LDAP_SERVER)
lobjects = server.search_ext_s(self.BASE_DN, self.SEARCH_SCOPE,
filter_)
if not lobjects:
raise ldap.NO_SUCH_OBJECT()
for (dn, _attrs) in lobjects:
if dn is None:
continue
try:
log.debug('Trying simple bind with %s' % dn)
server.simple_bind_s(dn, safe_str(password))
attrs = server.search_ext_s(dn, ldap.SCOPE_BASE,
'(objectClass=*)')[0][1]
break
except ldap.INVALID_CREDENTIALS:
log.debug("LDAP rejected password for user '%s' (%s): %s"
% (uid, username, dn))
else:
log.debug("No matching LDAP objects for authentication "
"of '%s' (%s)", uid, username)
raise LdapPasswordError()
except ldap.NO_SUCH_OBJECT:
log.debug("LDAP says no such user '%s' (%s)" % (uid, username))
raise LdapUsernameError()
except ldap.SERVER_DOWN:
raise LdapConnectionError("LDAP can't access authentication server")
return dn, attrs
class RhodeCodeAuthPlugin(auth_modules.RhodeCodeExternalAuthPlugin):
def __init__(self):
self._logger = logging.getLogger(__name__)
self._tls_kind_values = ["PLAIN", "LDAPS", "START_TLS"]
self._tls_reqcert_values = ["NEVER", "ALLOW", "TRY", "DEMAND", "HARD"]
self._search_scopes = ["BASE", "ONELEVEL", "SUBTREE"]
@hybrid_property
def name(self):
return "ldap"
def settings(self):
settings = [
{
"name": "host",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Host of the LDAP Server",
"formname": "LDAP Host"
},
{
"name": "port",
"validator": self.validators.Number(strip=True, not_empty=True),
"type": "string",
"description": "Port that the LDAP server is listening on",
"default": 389,
"formname": "Port"
},
{
"name": "dn_user",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "User to connect to LDAP",
"formname": "Account"
},
{
"name": "dn_pass",
"validator": self.validators.UnicodeString(strip=True),
"type": "password",
"description": "Password to connect to LDAP",
"formname": "Password"
},
{
"name": "tls_kind",
"validator": self.validators.OneOf(self._tls_kind_values),
"type": "select",
"values": self._tls_kind_values,
"description": "TLS Type",
"default": 'PLAIN',
"formname": "Connection Security"
},
{
"name": "tls_reqcert",
"validator": self.validators.OneOf(self._tls_reqcert_values),
"type": "select",
"values": self._tls_reqcert_values,
"description": "Require Cert over TLS?",
"formname": "Certificate Checks"
},
{
"name": "base_dn",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Base DN to search (e.g., dc=mydomain,dc=com)",
"formname": "Base DN"
},
{
"name": "filter",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Filter to narrow results (e.g., ou=Users, etc)",
"formname": "LDAP Search Filter"
},
{
"name": "search_scope",
"validator": self.validators.OneOf(self._search_scopes),
"type": "select",
"values": self._search_scopes,
"description": "How deep to search LDAP",
"formname": "LDAP Search Scope"
},
{
"name": "attr_login",
"validator": self.validators.AttrLoginValidator(not_empty=True, strip=True),
"type": "string",
"description": "LDAP Attribute to map to user name",
"formname": "Login Attribute"
},
{
"name": "attr_firstname",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "LDAP Attribute to map to first name",
"formname": "First Name Attribute"
},
{
"name": "attr_lastname",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "LDAP Attribute to map to last name",
"formname": "Last Name Attribute"
},
{
"name": "attr_email",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "LDAP Attribute to map to email address",
"formname": "Email Attribute"
}
]
return settings
def use_fake_password(self):
return True
def user_activation_state(self):
def_user_perms = User.get_default_user().AuthUser.permissions['global']
return 'hg.extern_activate.auto' in def_user_perms
def auth(self, userobj, username, password, settings, **kwargs):
"""
Given a user object (which may be null), username, a plaintext password,
and a settings object (containing all the keys needed as listed in settings()),
authenticate this user's login attempt.
Return None on failure. On success, return a dictionary of the form:
see: RhodeCodeAuthPluginBase.auth_func_attrs
This is later validated for correctness
"""
if not username or not password:
log.debug('Empty username or password skipping...')
return None
kwargs = {
'server': settings.get('host', ''),
'base_dn': settings.get('base_dn', ''),
'port': settings.get('port'),
'bind_dn': settings.get('dn_user'),
'bind_pass': settings.get('dn_pass'),
'tls_kind': settings.get('tls_kind'),
'tls_reqcert': settings.get('tls_reqcert'),
'ldap_filter': settings.get('filter'),
'search_scope': settings.get('search_scope'),
'attr_login': settings.get('attr_login'),
'ldap_version': 3,
}
if kwargs['bind_dn'] and not kwargs['bind_pass']:
log.debug('Using dynamic binding.')
kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username)
kwargs['bind_pass'] = password
log.debug('Checking for ldap authentication')
try:
aldap = AuthLdap(**kwargs)
(user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password)
log.debug('Got ldap DN response %s' % user_dn)
get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0]
# old attrs fetched from RhodeCode database
admin = getattr(userobj, 'admin', False)
active = getattr(userobj, 'active', True)
email = getattr(userobj, 'email', '')
firstname = getattr(userobj, 'firstname', '')
lastname = getattr(userobj, 'lastname', '')
extern_type = getattr(userobj, 'extern_type', '')
user_attrs = {
'username': username,
'firstname': safe_unicode(get_ldap_attr('attr_firstname') or firstname),
'lastname': safe_unicode(get_ldap_attr('attr_lastname') or lastname),
'groups': [],
'email': get_ldap_attr('attr_email' or email),
'admin': admin,
'active': active,
"active_from_extern": None,
'extern_name': user_dn,
'extern_type': extern_type,
}
log.info('user %s authenticated correctly' % user_attrs['username'])
return user_attrs
except (LdapUsernameError, LdapPasswordError, LdapImportError):
log.error(traceback.format_exc())
return None
except (Exception,):
log.error(traceback.format_exc())
return None