# -*- coding: utf-8 -*- """ rhodecode.lib.auth ~~~~~~~~~~~~~~~~~~ authentication and permission libraries :created_on: Apr 4, 2010 :copyright: (c) 2010 by marcink. :license: LICENSE_NAME, see LICENSE_FILE for more details. """ # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 # of the License or (at your opinion) any later version of the license. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. import bcrypt import random import logging import traceback from decorator import decorator from pylons import config, session, url, request from pylons.controllers.util import abort, redirect from pylons.i18n.translation import _ from rhodecode.lib.exceptions import LdapPasswordError, LdapUsernameError from rhodecode.lib.utils import get_repo_slug from rhodecode.lib.auth_ldap import AuthLdap from rhodecode.model import meta from rhodecode.model.user import UserModel from rhodecode.model.db import User, RepoToPerm, Repository, Permission, \ UserToPerm, UsersGroupToPerm, UsersGroupMember log = logging.getLogger(__name__) PERM_WEIGHTS = {'repository.none':0, 'repository.read':1, 'repository.write':3, 'repository.admin':3} class PasswordGenerator(object): """This is a simple class for generating password from different sets of characters usage: passwd_gen = PasswordGenerator() #print 8-letter password containing only big and small letters of alphabet print passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL) """ ALPHABETS_NUM = r'''1234567890'''#[0] ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''#[1] ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''#[2] ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?''' #[3] ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM + ALPHABETS_SPECIAL#[4] ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM#[5] ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM#[6] ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM#[7] def __init__(self, passwd=''): self.passwd = passwd def gen_password(self, len, type): self.passwd = ''.join([random.choice(type) for _ in xrange(len)]) return self.passwd def get_crypt_password(password): """Cryptographic function used for password hashing based on pybcrypt :param password: password to hash """ return bcrypt.hashpw(password, bcrypt.gensalt(10)) def check_password(password, hashed): return bcrypt.hashpw(password, hashed) == hashed def authfunc(environ, username, password): """Dummy authentication function used in Mercurial/Git/ and access control, :param environ: needed only for using in Basic auth """ return authenticate(username, password) def authenticate(username, password): """Authentication function used for access control, firstly checks for db authentication then if ldap is enabled for ldap authentication, also creates ldap user if not in database :param username: username :param password: password """ user_model = UserModel() user = user_model.get_by_username(username, cache=False) log.debug('Authenticating user using RhodeCode account') if user is not None and not user.ldap_dn: if user.active: if user.username == 'default' and user.active: log.info('user %s authenticated correctly as anonymous user', username) return True elif user.username == username and check_password(password, user.password): log.info('user %s authenticated correctly', username) return True else: log.warning('user %s is disabled', username) else: log.debug('Regular authentication failed') user_obj = user_model.get_by_username(username, cache=False, case_insensitive=True) if user_obj is not None and not user_obj.ldap_dn: log.debug('this user already exists as non ldap') return False from rhodecode.model.settings import SettingsModel ldap_settings = SettingsModel().get_ldap_settings() #====================================================================== # FALLBACK TO LDAP AUTH IF ENABLE #====================================================================== if ldap_settings.get('ldap_active', False): log.debug("Authenticating user using ldap") kwargs = { 'server':ldap_settings.get('ldap_host', ''), 'base_dn':ldap_settings.get('ldap_base_dn', ''), 'port':ldap_settings.get('ldap_port'), 'bind_dn':ldap_settings.get('ldap_dn_user'), 'bind_pass':ldap_settings.get('ldap_dn_pass'), 'use_ldaps':ldap_settings.get('ldap_ldaps'), 'tls_reqcert':ldap_settings.get('ldap_tls_reqcert'), 'ldap_filter':ldap_settings.get('ldap_filter'), 'search_scope':ldap_settings.get('ldap_search_scope'), 'attr_login':ldap_settings.get('ldap_attr_login'), 'ldap_version':3, } log.debug('Checking for ldap authentication') try: aldap = AuthLdap(**kwargs) (user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password) log.debug('Got ldap DN response %s', user_dn) user_attrs = { 'name' : ldap_attrs[ldap_settings.get('ldap_attr_firstname')][0], 'lastname' : ldap_attrs[ldap_settings.get('ldap_attr_lastname')][0], 'email' : ldap_attrs[ldap_settings.get('ldap_attr_email')][0], } if user_model.create_ldap(username, password, user_dn, user_attrs): log.info('created new ldap user %s', username) return True except (LdapUsernameError, LdapPasswordError,): pass except (Exception,): log.error(traceback.format_exc()) pass return False class AuthUser(object): """A simple object that handles a mercurial username for authentication """ def __init__(self): self.username = 'None' self.name = '' self.lastname = '' self.email = '' self.user_id = None self.is_authenticated = False self.is_admin = False self.permissions = {} def __repr__(self): return "" % (self.user_id, self.username) def set_available_permissions(config): """This function will propagate pylons globals with all available defined permission given in db. We don't want to check each time from db for new permissions since adding a new permission also requires application restart ie. to decorate new views with the newly created permission :param config: current pylons config instance """ log.info('getting information about all available permissions') try: sa = meta.Session() all_perms = sa.query(Permission).all() except: pass finally: meta.Session.remove() config['available_permissions'] = [x.permission_name for x in all_perms] def fill_perms(user): """Fills user permission attribute with permissions taken from database works for permissions given for repositories, and for permissions that as part of beeing group member :param user: user instance to fill his perms """ sa = meta.Session() user.permissions['repositories'] = {} user.permissions['global'] = set() #=========================================================================== # fetch default permissions #=========================================================================== default_user = UserModel().get_by_username('default', cache=True) default_perms = sa.query(RepoToPerm, Repository, Permission)\ .join((Repository, RepoToPerm.repository_id == Repository.repo_id))\ .join((Permission, RepoToPerm.permission_id == Permission.permission_id))\ .filter(RepoToPerm.user == default_user).all() if user.is_admin: #======================================================================= # #admin have all default rights set to admin #======================================================================= user.permissions['global'].add('hg.admin') for perm in default_perms: p = 'repository.admin' user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p else: #======================================================================= # set default permissions #======================================================================= #default global default_global_perms = sa.query(UserToPerm)\ .filter(UserToPerm.user == sa.query(User)\ .filter(User.username == 'default').one()) for perm in default_global_perms: user.permissions['global'].add(perm.permission.permission_name) #default for repositories for perm in default_perms: if perm.Repository.private and not perm.Repository.user_id == user.user_id: #disable defaults for private repos, p = 'repository.none' elif perm.Repository.user_id == user.user_id: #set admin if owner p = 'repository.admin' else: p = perm.Permission.permission_name user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p #======================================================================= # overwrite default with user permissions if any #======================================================================= user_perms = sa.query(RepoToPerm, Permission, Repository)\ .join((Repository, RepoToPerm.repository_id == Repository.repo_id))\ .join((Permission, RepoToPerm.permission_id == Permission.permission_id))\ .filter(RepoToPerm.user_id == user.user_id).all() for perm in user_perms: if perm.Repository.user_id == user.user_id:#set admin if owner p = 'repository.admin' else: p = perm.Permission.permission_name user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p #======================================================================= # check if user is part of groups for this repository and fill in # (or replace with higher) permissions #======================================================================= user_perms_from_users_groups = sa.query(UsersGroupToPerm, Permission, Repository,)\ .join((Repository, UsersGroupToPerm.repository_id == Repository.repo_id))\ .join((Permission, UsersGroupToPerm.permission_id == Permission.permission_id))\ .join((UsersGroupMember, UsersGroupToPerm.users_group_id == UsersGroupMember.users_group_id))\ .filter(UsersGroupMember.user_id == user.user_id).all() for perm in user_perms_from_users_groups: p = perm.Permission.permission_name cur_perm = user.permissions['repositories'][perm.UsersGroupToPerm.repository.repo_name] #overwrite permission only if it's greater than permission given from other sources if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]: user.permissions['repositories'][perm.UsersGroupToPerm.repository.repo_name] = p meta.Session.remove() return user def get_user(session): """Gets user from session, and wraps permissions into user :param session: """ user = session.get('rhodecode_user', AuthUser()) #if the user is not logged in we check for anonymous access #if user is logged and it's a default user check if we still have anonymous #access enabled if user.user_id is None or user.username == 'default': anonymous_user = UserModel().get_by_username('default', cache=True) if anonymous_user.active is True: #then we set this user is logged in user.is_authenticated = True user.user_id = anonymous_user.user_id else: user.is_authenticated = False if user.is_authenticated: user = UserModel().fill_data(user) user = fill_perms(user) session['rhodecode_user'] = user session.save() return user #=============================================================================== # CHECK DECORATORS #=============================================================================== class LoginRequired(object): """Must be logged in to execute this function else redirect to login page""" def __call__(self, func): return decorator(self.__wrapper, func) def __wrapper(self, func, *fargs, **fkwargs): user = session.get('rhodecode_user', AuthUser()) log.debug('Checking login required for user:%s', user.username) if user.is_authenticated: log.debug('user %s is authenticated', user.username) return func(*fargs, **fkwargs) else: log.warn('user %s not authenticated', user.username) p = '' if request.environ.get('SCRIPT_NAME') != '/': p += request.environ.get('SCRIPT_NAME') p += request.environ.get('PATH_INFO') if request.environ.get('QUERY_STRING'): p += '?' + request.environ.get('QUERY_STRING') log.debug('redirecting to login page with %s', p) return redirect(url('login_home', came_from=p)) class NotAnonymous(object): """Must be logged in to execute this function else redirect to login page""" def __call__(self, func): return decorator(self.__wrapper, func) def __wrapper(self, func, *fargs, **fkwargs): user = session.get('rhodecode_user', AuthUser()) log.debug('Checking if user is not anonymous') anonymous = user.username == 'default' if anonymous: p = '' if request.environ.get('SCRIPT_NAME') != '/': p += request.environ.get('SCRIPT_NAME') p += request.environ.get('PATH_INFO') if request.environ.get('QUERY_STRING'): p += '?' + request.environ.get('QUERY_STRING') import rhodecode.lib.helpers as h h.flash(_('You need to be a registered user to perform this action'), category='warning') return redirect(url('login_home', came_from=p)) else: return func(*fargs, **fkwargs) class PermsDecorator(object): """Base class for decorators""" def __init__(self, *required_perms): available_perms = config['available_permissions'] for perm in required_perms: if perm not in available_perms: raise Exception("'%s' permission is not defined" % perm) self.required_perms = set(required_perms) self.user_perms = None def __call__(self, func): return decorator(self.__wrapper, func) def __wrapper(self, func, *fargs, **fkwargs): # _wrapper.__name__ = func.__name__ # _wrapper.__dict__.update(func.__dict__) # _wrapper.__doc__ = func.__doc__ self.user = session.get('rhodecode_user', AuthUser()) self.user_perms = self.user.permissions log.debug('checking %s permissions %s for %s %s', self.__class__.__name__, self.required_perms, func.__name__, self.user) if self.check_permissions(): log.debug('Permission granted for %s %s', func.__name__, self.user) return func(*fargs, **fkwargs) else: log.warning('Permission denied for %s %s', func.__name__, self.user) #redirect with forbidden ret code return abort(403) def check_permissions(self): """Dummy function for overriding""" raise Exception('You have to write this function in child class') class HasPermissionAllDecorator(PermsDecorator): """Checks for access permission for all given predicates. All of them have to be meet in order to fulfill the request """ def check_permissions(self): if self.required_perms.issubset(self.user_perms.get('global')): return True return False class HasPermissionAnyDecorator(PermsDecorator): """Checks for access permission for any of given predicates. In order to fulfill the request any of predicates must be meet """ def check_permissions(self): if self.required_perms.intersection(self.user_perms.get('global')): return True return False class HasRepoPermissionAllDecorator(PermsDecorator): """Checks for access permission for all given predicates for specific repository. All of them have to be meet in order to fulfill the request """ def check_permissions(self): repo_name = get_repo_slug(request) try: user_perms = set([self.user_perms['repositories'][repo_name]]) except KeyError: return False if self.required_perms.issubset(user_perms): return True return False class HasRepoPermissionAnyDecorator(PermsDecorator): """Checks for access permission for any of given predicates for specific repository. In order to fulfill the request any of predicates must be meet """ def check_permissions(self): repo_name = get_repo_slug(request) try: user_perms = set([self.user_perms['repositories'][repo_name]]) except KeyError: return False if self.required_perms.intersection(user_perms): return True return False #=============================================================================== # CHECK FUNCTIONS #=============================================================================== class PermsFunction(object): """Base function for other check functions""" def __init__(self, *perms): available_perms = config['available_permissions'] for perm in perms: if perm not in available_perms: raise Exception("'%s' permission in not defined" % perm) self.required_perms = set(perms) self.user_perms = None self.granted_for = '' self.repo_name = None def __call__(self, check_Location=''): user = session.get('rhodecode_user', False) if not user: return False self.user_perms = user.permissions self.granted_for = user.username log.debug('checking %s %s %s', self.__class__.__name__, self.required_perms, user) if self.check_permissions(): log.debug('Permission granted for %s @ %s %s', self.granted_for, check_Location, user) return True else: log.warning('Permission denied for %s @ %s %s', self.granted_for, check_Location, user) return False def check_permissions(self): """Dummy function for overriding""" raise Exception('You have to write this function in child class') class HasPermissionAll(PermsFunction): def check_permissions(self): if self.required_perms.issubset(self.user_perms.get('global')): return True return False class HasPermissionAny(PermsFunction): def check_permissions(self): if self.required_perms.intersection(self.user_perms.get('global')): return True return False class HasRepoPermissionAll(PermsFunction): def __call__(self, repo_name=None, check_Location=''): self.repo_name = repo_name return super(HasRepoPermissionAll, self).__call__(check_Location) def check_permissions(self): if not self.repo_name: self.repo_name = get_repo_slug(request) try: self.user_perms = set([self.user_perms['repositories']\ [self.repo_name]]) except KeyError: return False self.granted_for = self.repo_name if self.required_perms.issubset(self.user_perms): return True return False class HasRepoPermissionAny(PermsFunction): def __call__(self, repo_name=None, check_Location=''): self.repo_name = repo_name return super(HasRepoPermissionAny, self).__call__(check_Location) def check_permissions(self): if not self.repo_name: self.repo_name = get_repo_slug(request) try: self.user_perms = set([self.user_perms['repositories']\ [self.repo_name]]) except KeyError: return False self.granted_for = self.repo_name if self.required_perms.intersection(self.user_perms): return True return False #=============================================================================== # SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH #=============================================================================== class HasPermissionAnyMiddleware(object): def __init__(self, *perms): self.required_perms = set(perms) def __call__(self, user, repo_name): usr = AuthUser() usr.user_id = user.user_id usr.username = user.username usr.is_admin = user.admin try: self.user_perms = set([fill_perms(usr)\ .permissions['repositories'][repo_name]]) except: self.user_perms = set() self.granted_for = '' self.username = user.username self.repo_name = repo_name return self.check_permissions() def check_permissions(self): log.debug('checking mercurial protocol ' 'permissions %s for user:%s repository:%s', self.user_perms, self.username, self.repo_name) if self.required_perms.intersection(self.user_perms): log.debug('permission granted') return True log.debug('permission denied') return False