# -*- coding: utf-8 -*- # Copyright (C) 2010-2019 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ users model for RhodeCode """ import logging import traceback import datetime import ipaddress from pyramid.threadlocal import get_current_request from sqlalchemy.exc import DatabaseError from rhodecode import events from rhodecode.lib.user_log_filter import user_log_filter from rhodecode.lib.utils2 import ( safe_unicode, get_current_rhodecode_user, action_logger_generic, AttributeDict, str2bool) from rhodecode.lib.exceptions import ( DefaultUserException, UserOwnsReposException, UserOwnsRepoGroupsException, UserOwnsUserGroupsException, NotAllowedToCreateUserError) from rhodecode.lib.caching_query import FromCache from rhodecode.model import BaseModel from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.db import ( _hash_key, true, false, or_, joinedload, User, UserToPerm, UserEmailMap, UserIpMap, UserLog) from rhodecode.model.meta import Session from rhodecode.model.repo_group import RepoGroupModel log = logging.getLogger(__name__) class UserModel(BaseModel): cls = User def get(self, user_id, cache=False): user = self.sa.query(User) if cache: user = user.options( FromCache("sql_cache_short", "get_user_%s" % user_id)) return user.get(user_id) def get_user(self, user): return self._get_user(user) def _serialize_user(self, user): import rhodecode.lib.helpers as h return { 'id': user.user_id, 'first_name': user.first_name, 'last_name': user.last_name, 'username': user.username, 'email': user.email, 'icon_link': h.gravatar_url(user.email, 30), 'profile_link': h.link_to_user(user), 'value_display': h.escape(h.person(user)), 'value': user.username, 'value_type': 'user', 'active': user.active, } def get_users(self, name_contains=None, limit=20, only_active=True): query = self.sa.query(User) if only_active: query = query.filter(User.active == true()) if name_contains: ilike_expression = u'%{}%'.format(safe_unicode(name_contains)) query = query.filter( or_( User.name.ilike(ilike_expression), User.lastname.ilike(ilike_expression), User.username.ilike(ilike_expression) ) ) query = query.limit(limit) users = query.all() _users = [ self._serialize_user(user) for user in users ] return _users def get_by_username(self, username, cache=False, case_insensitive=False): if case_insensitive: user = self.sa.query(User).filter(User.username.ilike(username)) else: user = self.sa.query(User)\ .filter(User.username == username) if cache: name_key = _hash_key(username) user = user.options( FromCache("sql_cache_short", "get_user_%s" % name_key)) return user.scalar() def get_by_email(self, email, cache=False, case_insensitive=False): return User.get_by_email(email, case_insensitive, cache) def get_by_auth_token(self, auth_token, cache=False): return User.get_by_auth_token(auth_token, cache) def get_active_user_count(self, cache=False): qry = User.query().filter( User.active == true()).filter( User.username != User.DEFAULT_USER) if cache: qry = qry.options( FromCache("sql_cache_short", "get_active_users")) return qry.count() def create(self, form_data, cur_user=None): if not cur_user: cur_user = getattr(get_current_rhodecode_user(), 'username', None) user_data = { 'username': form_data['username'], 'password': form_data['password'], 'email': form_data['email'], 'firstname': form_data['firstname'], 'lastname': form_data['lastname'], 'active': form_data['active'], 'extern_type': form_data['extern_type'], 'extern_name': form_data['extern_name'], 'admin': False, 'cur_user': cur_user } if 'create_repo_group' in form_data: user_data['create_repo_group'] = str2bool( form_data.get('create_repo_group')) try: if form_data.get('password_change'): user_data['force_password_change'] = True return UserModel().create_or_update(**user_data) except Exception: log.error(traceback.format_exc()) raise def update_user(self, user, skip_attrs=None, **kwargs): from rhodecode.lib.auth import get_crypt_password user = self._get_user(user) if user.username == User.DEFAULT_USER: raise DefaultUserException( "You can't edit this user (`%(username)s`) since it's " "crucial for entire application" % { 'username': user.username}) # first store only defaults user_attrs = { 'updating_user_id': user.user_id, 'username': user.username, 'password': user.password, 'email': user.email, 'firstname': user.name, 'lastname': user.lastname, 'active': user.active, 'admin': user.admin, 'extern_name': user.extern_name, 'extern_type': user.extern_type, 'language': user.user_data.get('language') } # in case there's new_password, that comes from form, use it to # store password if kwargs.get('new_password'): kwargs['password'] = kwargs['new_password'] # cleanups, my_account password change form kwargs.pop('current_password', None) kwargs.pop('new_password', None) # cleanups, user edit password change form kwargs.pop('password_confirmation', None) kwargs.pop('password_change', None) # create repo group on user creation kwargs.pop('create_repo_group', None) # legacy forms send name, which is the firstname firstname = kwargs.pop('name', None) if firstname: kwargs['firstname'] = firstname for k, v in kwargs.items(): # skip if we don't want to update this if skip_attrs and k in skip_attrs: continue user_attrs[k] = v try: return self.create_or_update(**user_attrs) except Exception: log.error(traceback.format_exc()) raise def create_or_update( self, username, password, email, firstname='', lastname='', active=True, admin=False, extern_type=None, extern_name=None, cur_user=None, plugin=None, force_password_change=False, allow_to_create_user=True, create_repo_group=None, updating_user_id=None, language=None, strict_creation_check=True): """ Creates a new instance if not found, or updates current one :param username: :param password: :param email: :param firstname: :param lastname: :param active: :param admin: :param extern_type: :param extern_name: :param cur_user: :param plugin: optional plugin this method was called from :param force_password_change: toggles new or existing user flag for password change :param allow_to_create_user: Defines if the method can actually create new users :param create_repo_group: Defines if the method should also create an repo group with user name, and owner :param updating_user_id: if we set it up this is the user we want to update this allows to editing username. :param language: language of user from interface. :returns: new User object with injected `is_new_user` attribute. """ if not cur_user: cur_user = getattr(get_current_rhodecode_user(), 'username', None) from rhodecode.lib.auth import ( get_crypt_password, check_password, generate_auth_token) from rhodecode.lib.hooks_base import ( log_create_user, check_allowed_create_user) def _password_change(new_user, password): old_password = new_user.password or '' # empty password if not old_password: return False # password check is only needed for RhodeCode internal auth calls # in case it's a plugin we don't care if not plugin: # first check if we gave crypted password back, and if it # matches it's not password change if new_user.password == password: return False password_match = check_password(password, old_password) if not password_match: return True return False # read settings on default personal repo group creation if create_repo_group is None: default_create_repo_group = RepoGroupModel()\ .get_default_create_personal_repo_group() create_repo_group = default_create_repo_group user_data = { 'username': username, 'password': password, 'email': email, 'firstname': firstname, 'lastname': lastname, 'active': active, 'admin': admin } if updating_user_id: log.debug('Checking for existing account in RhodeCode ' 'database with user_id `%s` ', updating_user_id) user = User.get(updating_user_id) else: log.debug('Checking for existing account in RhodeCode ' 'database with username `%s` ', username) user = User.get_by_username(username, case_insensitive=True) if user is None: # we check internal flag if this method is actually allowed to # create new user if not allow_to_create_user: msg = ('Method wants to create new user, but it is not ' 'allowed to do so') log.warning(msg) raise NotAllowedToCreateUserError(msg) log.debug('Creating new user %s', username) # only if we create user that is active new_active_user = active if new_active_user and strict_creation_check: # raises UserCreationError if it's not allowed for any reason to # create new active user, this also executes pre-create hooks check_allowed_create_user(user_data, cur_user, strict_check=True) events.trigger(events.UserPreCreate(user_data)) new_user = User() edit = False else: log.debug('updating user `%s`', username) events.trigger(events.UserPreUpdate(user, user_data)) new_user = user edit = True # we're not allowed to edit default user if user.username == User.DEFAULT_USER: raise DefaultUserException( "You can't edit this user (`%(username)s`) since it's " "crucial for entire application" % {'username': user.username}) # inject special attribute that will tell us if User is new or old new_user.is_new_user = not edit # for users that didn's specify auth type, we use RhodeCode built in from rhodecode.authentication.plugins import auth_rhodecode extern_name = extern_name or auth_rhodecode.RhodeCodeAuthPlugin.uid extern_type = extern_type or auth_rhodecode.RhodeCodeAuthPlugin.uid try: new_user.username = username new_user.admin = admin new_user.email = email new_user.active = active new_user.extern_name = safe_unicode(extern_name) new_user.extern_type = safe_unicode(extern_type) new_user.name = firstname new_user.lastname = lastname # set password only if creating an user or password is changed if not edit or _password_change(new_user, password): reason = 'new password' if edit else 'new user' log.debug('Updating password reason=>%s', reason) new_user.password = get_crypt_password(password) if password else None if force_password_change: new_user.update_userdata(force_password_change=True) if language: new_user.update_userdata(language=language) new_user.update_userdata(notification_status=True) self.sa.add(new_user) if not edit and create_repo_group: RepoGroupModel().create_personal_repo_group( new_user, commit_early=False) if not edit: # add the RSS token self.add_auth_token( user=username, lifetime_minutes=-1, role=self.auth_token_role.ROLE_FEED, description=u'Generated feed token') kwargs = new_user.get_dict() # backward compat, require api_keys present kwargs['api_keys'] = kwargs['auth_tokens'] log_create_user(created_by=cur_user, **kwargs) events.trigger(events.UserPostCreate(user_data)) return new_user except (DatabaseError,): log.error(traceback.format_exc()) raise def create_registration(self, form_data, extern_name='rhodecode', extern_type='rhodecode'): from rhodecode.model.notification import NotificationModel from rhodecode.model.notification import EmailNotificationModel try: form_data['admin'] = False form_data['extern_name'] = extern_name form_data['extern_type'] = extern_type new_user = self.create(form_data) self.sa.add(new_user) self.sa.flush() user_data = new_user.get_dict() kwargs = { # use SQLALCHEMY safe dump of user data 'user': AttributeDict(user_data), 'date': datetime.datetime.now() } notification_type = EmailNotificationModel.TYPE_REGISTRATION # pre-generate the subject for notification itself (subject, _h, _e, # we don't care about those body_plaintext) = EmailNotificationModel().render_email( notification_type, **kwargs) # create notification objects, and emails NotificationModel().create( created_by=new_user, notification_subject=subject, notification_body=body_plaintext, notification_type=notification_type, recipients=None, # all admins email_kwargs=kwargs, ) return new_user except Exception: log.error(traceback.format_exc()) raise def _handle_user_repos(self, username, repositories, handle_mode=None): _superadmin = self.cls.get_first_super_admin() left_overs = True from rhodecode.model.repo import RepoModel if handle_mode == 'detach': for obj in repositories: obj.user = _superadmin # set description we know why we super admin now owns # additional repositories that were orphaned ! obj.description += ' \n::detached repository from deleted user: %s' % (username,) self.sa.add(obj) left_overs = False elif handle_mode == 'delete': for obj in repositories: RepoModel().delete(obj, forks='detach') left_overs = False # if nothing is done we have left overs left return left_overs def _handle_user_repo_groups(self, username, repository_groups, handle_mode=None): _superadmin = self.cls.get_first_super_admin() left_overs = True from rhodecode.model.repo_group import RepoGroupModel if handle_mode == 'detach': for r in repository_groups: r.user = _superadmin # set description we know why we super admin now owns # additional repositories that were orphaned ! r.group_description += ' \n::detached repository group from deleted user: %s' % (username,) r.personal = False self.sa.add(r) left_overs = False elif handle_mode == 'delete': for r in repository_groups: RepoGroupModel().delete(r) left_overs = False # if nothing is done we have left overs left return left_overs def _handle_user_user_groups(self, username, user_groups, handle_mode=None): _superadmin = self.cls.get_first_super_admin() left_overs = True from rhodecode.model.user_group import UserGroupModel if handle_mode == 'detach': for r in user_groups: for user_user_group_to_perm in r.user_user_group_to_perm: if user_user_group_to_perm.user.username == username: user_user_group_to_perm.user = _superadmin r.user = _superadmin # set description we know why we super admin now owns # additional repositories that were orphaned ! r.user_group_description += ' \n::detached user group from deleted user: %s' % (username,) self.sa.add(r) left_overs = False elif handle_mode == 'delete': for r in user_groups: UserGroupModel().delete(r) left_overs = False # if nothing is done we have left overs left return left_overs def delete(self, user, cur_user=None, handle_repos=None, handle_repo_groups=None, handle_user_groups=None): if not cur_user: cur_user = getattr( get_current_rhodecode_user(), 'username', None) user = self._get_user(user) try: if user.username == User.DEFAULT_USER: raise DefaultUserException( u"You can't remove this user since it's" u" crucial for entire application") left_overs = self._handle_user_repos( user.username, user.repositories, handle_repos) if left_overs and user.repositories: repos = [x.repo_name for x in user.repositories] raise UserOwnsReposException( u'user "%(username)s" still owns %(len_repos)s repositories and cannot be ' u'removed. Switch owners or remove those repositories:%(list_repos)s' % {'username': user.username, 'len_repos': len(repos), 'list_repos': ', '.join(repos)}) left_overs = self._handle_user_repo_groups( user.username, user.repository_groups, handle_repo_groups) if left_overs and user.repository_groups: repo_groups = [x.group_name for x in user.repository_groups] raise UserOwnsRepoGroupsException( u'user "%(username)s" still owns %(len_repo_groups)s repository groups and cannot be ' u'removed. Switch owners or remove those repository groups:%(list_repo_groups)s' % {'username': user.username, 'len_repo_groups': len(repo_groups), 'list_repo_groups': ', '.join(repo_groups)}) left_overs = self._handle_user_user_groups( user.username, user.user_groups, handle_user_groups) if left_overs and user.user_groups: user_groups = [x.users_group_name for x in user.user_groups] raise UserOwnsUserGroupsException( u'user "%s" still owns %s user groups and cannot be ' u'removed. Switch owners or remove those user groups:%s' % (user.username, len(user_groups), ', '.join(user_groups))) # we might change the user data with detach/delete, make sure # the object is marked as expired before actually deleting ! self.sa.expire(user) self.sa.delete(user) from rhodecode.lib.hooks_base import log_delete_user log_delete_user(deleted_by=cur_user, **user.get_dict()) except Exception: log.error(traceback.format_exc()) raise def reset_password_link(self, data, pwd_reset_url): from rhodecode.lib.celerylib import tasks, run_task from rhodecode.model.notification import EmailNotificationModel user_email = data['email'] try: user = User.get_by_email(user_email) if user: log.debug('password reset user found %s', user) email_kwargs = { 'password_reset_url': pwd_reset_url, 'user': user, 'email': user_email, 'date': datetime.datetime.now() } (subject, headers, email_body, email_body_plaintext) = EmailNotificationModel().render_email( EmailNotificationModel.TYPE_PASSWORD_RESET, **email_kwargs) recipients = [user_email] action_logger_generic( 'sending password reset email to user: {}'.format( user), namespace='security.password_reset') run_task(tasks.send_email, recipients, subject, email_body_plaintext, email_body) else: log.debug("password reset email %s not found", user_email) except Exception: log.error(traceback.format_exc()) return False return True def reset_password(self, data): from rhodecode.lib.celerylib import tasks, run_task from rhodecode.model.notification import EmailNotificationModel from rhodecode.lib import auth user_email = data['email'] pre_db = True try: user = User.get_by_email(user_email) new_passwd = auth.PasswordGenerator().gen_password( 12, auth.PasswordGenerator.ALPHABETS_BIG_SMALL) if user: user.password = auth.get_crypt_password(new_passwd) # also force this user to reset his password ! user.update_userdata(force_password_change=True) Session().add(user) # now delete the token in question UserApiKeys = AuthTokenModel.cls UserApiKeys().query().filter( UserApiKeys.api_key == data['token']).delete() Session().commit() log.info('successfully reset password for `%s`', user_email) if new_passwd is None: raise Exception('unable to generate new password') pre_db = False email_kwargs = { 'new_password': new_passwd, 'user': user, 'email': user_email, 'date': datetime.datetime.now() } (subject, headers, email_body, email_body_plaintext) = EmailNotificationModel().render_email( EmailNotificationModel.TYPE_PASSWORD_RESET_CONFIRMATION, **email_kwargs) recipients = [user_email] action_logger_generic( 'sent new password to user: {} with email: {}'.format( user, user_email), namespace='security.password_reset') run_task(tasks.send_email, recipients, subject, email_body_plaintext, email_body) except Exception: log.error('Failed to update user password') log.error(traceback.format_exc()) if pre_db: # we rollback only if local db stuff fails. If it goes into # run_task, we're pass rollback state this wouldn't work then Session().rollback() return True def fill_data(self, auth_user, user_id=None, api_key=None, username=None): """ Fetches auth_user by user_id,or api_key if present. Fills auth_user attributes with those taken from database. Additionally set's is_authenitated if lookup fails present in database :param auth_user: instance of user to set attributes :param user_id: user id to fetch by :param api_key: api key to fetch by :param username: username to fetch by """ def token_obfuscate(token): if token: return token[:4] + "****" if user_id is None and api_key is None and username is None: raise Exception('You need to pass user_id, api_key or username') log.debug( 'AuthUser: fill data execution based on: ' 'user_id:%s api_key:%s username:%s', user_id, api_key, username) try: dbuser = None if user_id: dbuser = self.get(user_id) elif api_key: dbuser = self.get_by_auth_token(api_key) elif username: dbuser = self.get_by_username(username) if not dbuser: log.warning( 'Unable to lookup user by id:%s api_key:%s username:%s', user_id, token_obfuscate(api_key), username) return False if not dbuser.active: log.debug('User `%s:%s` is inactive, skipping fill data', username, user_id) return False log.debug('AuthUser: filling found user:%s data', dbuser) user_data = dbuser.get_dict() user_data.update({ # set explicit the safe escaped values 'first_name': dbuser.first_name, 'last_name': dbuser.last_name, }) for k, v in user_data.items(): # properties of auth user we dont update if k not in ['auth_tokens', 'permissions']: setattr(auth_user, k, v) except Exception: log.error(traceback.format_exc()) auth_user.is_authenticated = False return False return True def has_perm(self, user, perm): perm = self._get_perm(perm) user = self._get_user(user) return UserToPerm.query().filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm).scalar() is not None def grant_perm(self, user, perm): """ Grant user global permissions :param user: :param perm: """ user = self._get_user(user) perm = self._get_perm(perm) # if this permission is already granted skip it _perm = UserToPerm.query()\ .filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm)\ .scalar() if _perm: return new = UserToPerm() new.user = user new.permission = perm self.sa.add(new) return new def revoke_perm(self, user, perm): """ Revoke users global permissions :param user: :param perm: """ user = self._get_user(user) perm = self._get_perm(perm) obj = UserToPerm.query()\ .filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm)\ .scalar() if obj: self.sa.delete(obj) def add_extra_email(self, user, email): """ Adds email address to UserEmailMap :param user: :param email: """ user = self._get_user(user) obj = UserEmailMap() obj.user = user obj.email = email self.sa.add(obj) return obj def delete_extra_email(self, user, email_id): """ Removes email address from UserEmailMap :param user: :param email_id: """ user = self._get_user(user) obj = UserEmailMap.query().get(email_id) if obj and obj.user_id == user.user_id: self.sa.delete(obj) def parse_ip_range(self, ip_range): ip_list = [] def make_unique(value): seen = [] return [c for c in value if not (c in seen or seen.append(c))] # firsts split by commas for ip_range in ip_range.split(','): if not ip_range: continue ip_range = ip_range.strip() if '-' in ip_range: start_ip, end_ip = ip_range.split('-', 1) start_ip = ipaddress.ip_address(safe_unicode(start_ip.strip())) end_ip = ipaddress.ip_address(safe_unicode(end_ip.strip())) parsed_ip_range = [] for index in xrange(int(start_ip), int(end_ip) + 1): new_ip = ipaddress.ip_address(index) parsed_ip_range.append(str(new_ip)) ip_list.extend(parsed_ip_range) else: ip_list.append(ip_range) return make_unique(ip_list) def add_extra_ip(self, user, ip, description=None): """ Adds ip address to UserIpMap :param user: :param ip: """ user = self._get_user(user) obj = UserIpMap() obj.user = user obj.ip_addr = ip obj.description = description self.sa.add(obj) return obj auth_token_role = AuthTokenModel.cls def add_auth_token(self, user, lifetime_minutes, role, description=u'', scope_callback=None): """ Add AuthToken for user. :param user: username/user_id :param lifetime_minutes: in minutes the lifetime for token, -1 equals no limit :param role: one of AuthTokenModel.cls.ROLE_* :param description: optional string description """ token = AuthTokenModel().create( user, description, lifetime_minutes, role) if scope_callback and callable(scope_callback): # call the callback if we provide, used to attach scope for EE edition scope_callback(token) return token def delete_extra_ip(self, user, ip_id): """ Removes ip address from UserIpMap :param user: :param ip_id: """ user = self._get_user(user) obj = UserIpMap.query().get(ip_id) if obj and obj.user_id == user.user_id: self.sa.delete(obj) def get_accounts_in_creation_order(self, current_user=None): """ Get accounts in order of creation for deactivation for license limits pick currently logged in user, and append to the list in position 0 pick all super-admins in order of creation date and add it to the list pick all other accounts in order of creation and add it to the list. Based on that list, the last accounts can be disabled as they are created at the end and don't include any of the super admins as well as the current user. :param current_user: optionally current user running this operation """ if not current_user: current_user = get_current_rhodecode_user() active_super_admins = [ x.user_id for x in User.query() .filter(User.user_id != current_user.user_id) .filter(User.active == true()) .filter(User.admin == true()) .order_by(User.created_on.asc())] active_regular_users = [ x.user_id for x in User.query() .filter(User.user_id != current_user.user_id) .filter(User.active == true()) .filter(User.admin == false()) .order_by(User.created_on.asc())] list_of_accounts = [current_user.user_id] list_of_accounts += active_super_admins list_of_accounts += active_regular_users return list_of_accounts def deactivate_last_users(self, expected_users, current_user=None): """ Deactivate accounts that are over the license limits. Algorithm of which accounts to disabled is based on the formula: Get current user, then super admins in creation order, then regular active users in creation order. Using that list we mark all accounts from the end of it as inactive. This way we block only latest created accounts. :param expected_users: list of users in special order, we deactivate the end N amount of users from that list """ list_of_accounts = self.get_accounts_in_creation_order( current_user=current_user) for acc_id in list_of_accounts[expected_users + 1:]: user = User.get(acc_id) log.info('Deactivating account %s for license unlock', user) user.active = False Session().add(user) Session().commit() return def get_user_log(self, user, filter_term): user_log = UserLog.query()\ .filter(or_(UserLog.user_id == user.user_id, UserLog.username == user.username))\ .options(joinedload(UserLog.user))\ .options(joinedload(UserLog.repository))\ .order_by(UserLog.action_date.desc()) user_log = user_log_filter(user_log, filter_term) return user_log