# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ rhodecode.model.user ~~~~~~~~~~~~~~~~~~~~ users model for RhodeCode :created_on: Apr 9, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH. :license: GPLv3, see LICENSE for more details. """ import logging import traceback from pylons import url from pylons.i18n.translation import _ from sqlalchemy.exc import DatabaseError from rhodecode.lib.utils2 import safe_unicode, generate_api_key, get_current_rhodecode_user from rhodecode.lib.caching_query import FromCache from rhodecode.model import BaseModel from rhodecode.model.db import User, UserToPerm, Notification, \ UserEmailMap, UserIpMap from rhodecode.lib.exceptions import DefaultUserException, \ UserOwnsReposException from rhodecode.model.meta import Session log = logging.getLogger(__name__) class UserModel(BaseModel): cls = User def get(self, user_id, cache=False): user = self.sa.query(User) if cache: user = user.options(FromCache("sql_cache_short", "get_user_%s" % user_id)) return user.get(user_id) def get_user(self, user): return self._get_user(user) def get_by_username(self, username, cache=False, case_insensitive=False): if case_insensitive: user = self.sa.query(User).filter(User.username.ilike(username)) else: user = self.sa.query(User)\ .filter(User.username == username) if cache: user = user.options(FromCache("sql_cache_short", "get_user_%s" % username)) return user.scalar() def get_by_email(self, email, cache=False, case_insensitive=False): return User.get_by_email(email, case_insensitive, cache) def get_by_api_key(self, api_key, cache=False): return User.get_by_api_key(api_key, cache) def create(self, form_data, cur_user=None): if not cur_user: cur_user = getattr(get_current_rhodecode_user(), 'username', None) from rhodecode.lib.hooks import log_create_user, check_allowed_create_user _fd = form_data user_data = { 'username': _fd['username'], 'password': _fd['password'], 'email': _fd['email'], 'firstname': _fd['firstname'], 'lastname': _fd['lastname'], 'active': _fd['active'], 'admin': False } # raises UserCreationError if it's not allowed check_allowed_create_user(user_data, cur_user) from rhodecode.lib.auth import get_crypt_password try: new_user = User() for k, v in form_data.items(): if k == 'password': v = get_crypt_password(v) if k == 'firstname': k = 'name' setattr(new_user, k, v) new_user.api_key = generate_api_key(form_data['username']) self.sa.add(new_user) log_create_user(new_user.get_dict(), cur_user) return new_user except Exception: log.error(traceback.format_exc()) raise def create_or_update(self, username, password, email, firstname='', lastname='', active=True, admin=False, extern_type=None, extern_name=None, cur_user=None): """ Creates a new instance if not found, or updates current one :param username: :param password: :param email: :param active: :param firstname: :param lastname: :param active: :param admin: :param extern_name: :param extern_type: :param cur_user: """ if not cur_user: cur_user = getattr(get_current_rhodecode_user(), 'username', None) from rhodecode.lib.auth import get_crypt_password, check_password from rhodecode.lib.hooks import log_create_user, check_allowed_create_user user_data = { 'username': username, 'password': password, 'email': email, 'firstname': firstname, 'lastname': lastname, 'active': active, 'admin': admin } # raises UserCreationError if it's not allowed check_allowed_create_user(user_data, cur_user) log.debug('Checking for %s account in RhodeCode database' % username) user = User.get_by_username(username, case_insensitive=True) if user is None: log.debug('creating new user %s' % username) new_user = User() edit = False else: log.debug('updating user %s' % username) new_user = user edit = True try: new_user.username = username new_user.admin = admin new_user.email = email new_user.active = active new_user.extern_name = safe_unicode(extern_name) if extern_name else None new_user.extern_type = safe_unicode(extern_type) if extern_type else None new_user.name = firstname new_user.lastname = lastname if not edit: new_user.api_key = generate_api_key(username) # set password only if creating an user or password is changed password_change = new_user.password and not check_password(password, new_user.password) if not edit or password_change: reason = 'new password' if edit else 'new user' log.debug('Updating password reason=>%s' % (reason,)) new_user.password = get_crypt_password(password) if password else None self.sa.add(new_user) if not edit: log_create_user(new_user.get_dict(), cur_user) return new_user except (DatabaseError,): log.error(traceback.format_exc()) raise def create_registration(self, form_data): from rhodecode.model.notification import NotificationModel try: form_data['admin'] = False form_data['extern_name'] = 'rhodecode' form_data['extern_type'] = 'rhodecode' new_user = self.create(form_data) self.sa.add(new_user) self.sa.flush() # notification to admins subject = _('New user registration') body = ('New user registration\n' '---------------------\n' '- Username: %s\n' '- Full Name: %s\n' '- Email: %s\n') body = body % (new_user.username, new_user.full_name, new_user.email) edit_url = url('edit_user', id=new_user.user_id, qualified=True) kw = {'registered_user_url': edit_url} NotificationModel().create(created_by=new_user, subject=subject, body=body, recipients=None, type_=Notification.TYPE_REGISTRATION, email_kwargs=kw) except Exception: log.error(traceback.format_exc()) raise def update(self, user_id, form_data, skip_attrs=[]): from rhodecode.lib.auth import get_crypt_password try: user = self.get(user_id, cache=False) if user.username == User.DEFAULT_USER: raise DefaultUserException( _("You can't Edit this user since it's " "crucial for entire application")) for k, v in form_data.items(): if k in skip_attrs: continue if k == 'new_password' and v: user.password = get_crypt_password(v) else: # old legacy thing orm models store firstname as name, # need proper refactor to username if k == 'firstname': k = 'name' setattr(user, k, v) self.sa.add(user) except Exception: log.error(traceback.format_exc()) raise def update_user(self, user, **kwargs): from rhodecode.lib.auth import get_crypt_password try: user = self._get_user(user) if user.username == User.DEFAULT_USER: raise DefaultUserException( _("You can't Edit this user since it's" " crucial for entire application") ) for k, v in kwargs.items(): if k == 'password' and v: v = get_crypt_password(v) setattr(user, k, v) self.sa.add(user) return user except Exception: log.error(traceback.format_exc()) raise def delete(self, user, cur_user=None): if not cur_user: cur_user = getattr(get_current_rhodecode_user(), 'username', None) user = self._get_user(user) try: if user.username == User.DEFAULT_USER: raise DefaultUserException( _(u"You can't remove this user since it's" " crucial for entire application") ) if user.repositories: repos = [x.repo_name for x in user.repositories] raise UserOwnsReposException( _(u'user "%s" still owns %s repositories and cannot be ' 'removed. Switch owners or remove those repositories. %s') % (user.username, len(repos), ', '.join(repos)) ) self.sa.delete(user) from rhodecode.lib.hooks import log_delete_user log_delete_user(user.get_dict(), cur_user) except Exception: log.error(traceback.format_exc()) raise def reset_password_link(self, data): from rhodecode.lib.celerylib import tasks, run_task from rhodecode.model.notification import EmailNotificationModel user_email = data['email'] try: user = User.get_by_email(user_email) if user: log.debug('password reset user found %s' % user) link = url('reset_password_confirmation', key=user.api_key, qualified=True) reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET body = EmailNotificationModel().get_email_tmpl(reg_type, **{'user': user.short_contact, 'reset_url': link}) log.debug('sending email') run_task(tasks.send_email, user_email, _("Password reset link"), body, body) log.info('send new password mail to %s' % user_email) else: log.debug("password reset email %s not found" % user_email) except Exception: log.error(traceback.format_exc()) return False return True def reset_password(self, data): from rhodecode.lib.celerylib import tasks, run_task from rhodecode.lib import auth user_email = data['email'] pre_db = True try: user = User.get_by_email(user_email) new_passwd = auth.PasswordGenerator().gen_password(8, auth.PasswordGenerator.ALPHABETS_BIG_SMALL) if user: user.password = auth.get_crypt_password(new_passwd) Session().add(user) Session().commit() log.info('change password for %s' % user_email) if new_passwd is None: raise Exception('unable to generate new password') pre_db = False run_task(tasks.send_email, user_email, _('Your new password'), _('Your new RhodeCode password:%s') % (new_passwd,)) log.info('send new password mail to %s' % user_email) except Exception: log.error('Failed to update user password') log.error(traceback.format_exc()) if pre_db: # we rollback only if local db stuff fails. If it goes into # run_task, we're pass rollback state this wouldn't work then Session().rollback() return True def fill_data(self, auth_user, user_id=None, api_key=None, username=None): """ Fetches auth_user by user_id,or api_key if present. Fills auth_user attributes with those taken from database. Additionally set's is_authenitated if lookup fails present in database :param auth_user: instance of user to set attributes :param user_id: user id to fetch by :param api_key: api key to fetch by :param username: username to fetch by """ if user_id is None and api_key is None and username is None: raise Exception('You need to pass user_id, api_key or username') try: dbuser = None if user_id: dbuser = self.get(user_id) elif api_key: dbuser = self.get_by_api_key(api_key) elif username: dbuser = self.get_by_username(username) if dbuser is not None and dbuser.active: log.debug('filling %s data' % dbuser) for k, v in dbuser.get_dict().iteritems(): if k not in ['api_keys', 'permissions']: setattr(auth_user, k, v) else: return False except Exception: log.error(traceback.format_exc()) auth_user.is_authenticated = False return False return True def has_perm(self, user, perm): perm = self._get_perm(perm) user = self._get_user(user) return UserToPerm.query().filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm).scalar() is not None def grant_perm(self, user, perm): """ Grant user global permissions :param user: :param perm: """ user = self._get_user(user) perm = self._get_perm(perm) # if this permission is already granted skip it _perm = UserToPerm.query()\ .filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm)\ .scalar() if _perm: return new = UserToPerm() new.user = user new.permission = perm self.sa.add(new) return new def revoke_perm(self, user, perm): """ Revoke users global permissions :param user: :param perm: """ user = self._get_user(user) perm = self._get_perm(perm) obj = UserToPerm.query()\ .filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm)\ .scalar() if obj: self.sa.delete(obj) def add_extra_email(self, user, email): """ Adds email address to UserEmailMap :param user: :param email: """ from rhodecode.model import forms form = forms.UserExtraEmailForm()() data = form.to_python(dict(email=email)) user = self._get_user(user) obj = UserEmailMap() obj.user = user obj.email = data['email'] self.sa.add(obj) return obj def delete_extra_email(self, user, email_id): """ Removes email address from UserEmailMap :param user: :param email_id: """ user = self._get_user(user) obj = UserEmailMap.query().get(email_id) if obj: self.sa.delete(obj) def add_extra_ip(self, user, ip): """ Adds ip address to UserIpMap :param user: :param ip: """ from rhodecode.model import forms form = forms.UserExtraIpForm()() data = form.to_python(dict(ip=ip)) user = self._get_user(user) obj = UserIpMap() obj.user = user obj.ip_addr = data['ip'] self.sa.add(obj) return obj def delete_extra_ip(self, user, ip_id): """ Removes ip address from UserIpMap :param user: :param ip_id: """ user = self._get_user(user) obj = UserIpMap.query().get(ip_id) if obj: self.sa.delete(obj)