|
|
# -*- coding: utf-8 -*-
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
"""
|
|
|
rhodecode.model.user
|
|
|
~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
users model for RhodeCode
|
|
|
|
|
|
:created_on: Apr 9, 2010
|
|
|
:author: marcink
|
|
|
:copyright: (c) 2013 RhodeCode GmbH.
|
|
|
:license: GPLv3, see LICENSE for more details.
|
|
|
"""
|
|
|
|
|
|
|
|
|
import logging
|
|
|
import traceback
|
|
|
from pylons import url
|
|
|
from pylons.i18n.translation import _
|
|
|
|
|
|
from sqlalchemy.exc import DatabaseError
|
|
|
|
|
|
|
|
|
from rhodecode.lib.utils2 import safe_unicode, generate_api_key, get_current_rhodecode_user
|
|
|
from rhodecode.lib.caching_query import FromCache
|
|
|
from rhodecode.model import BaseModel
|
|
|
from rhodecode.model.db import User, UserToPerm, Notification, \
|
|
|
UserEmailMap, UserIpMap
|
|
|
from rhodecode.lib.exceptions import DefaultUserException, \
|
|
|
UserOwnsReposException
|
|
|
from rhodecode.model.meta import Session
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class UserModel(BaseModel):
|
|
|
cls = User
|
|
|
|
|
|
def get(self, user_id, cache=False):
|
|
|
user = self.sa.query(User)
|
|
|
if cache:
|
|
|
user = user.options(FromCache("sql_cache_short",
|
|
|
"get_user_%s" % user_id))
|
|
|
return user.get(user_id)
|
|
|
|
|
|
def get_user(self, user):
|
|
|
return self._get_user(user)
|
|
|
|
|
|
def get_by_username(self, username, cache=False, case_insensitive=False):
|
|
|
|
|
|
if case_insensitive:
|
|
|
user = self.sa.query(User).filter(User.username.ilike(username))
|
|
|
else:
|
|
|
user = self.sa.query(User)\
|
|
|
.filter(User.username == username)
|
|
|
if cache:
|
|
|
user = user.options(FromCache("sql_cache_short",
|
|
|
"get_user_%s" % username))
|
|
|
return user.scalar()
|
|
|
|
|
|
def get_by_email(self, email, cache=False, case_insensitive=False):
|
|
|
return User.get_by_email(email, case_insensitive, cache)
|
|
|
|
|
|
def get_by_api_key(self, api_key, cache=False):
|
|
|
return User.get_by_api_key(api_key, cache)
|
|
|
|
|
|
def create(self, form_data, cur_user=None):
|
|
|
if not cur_user:
|
|
|
cur_user = getattr(get_current_rhodecode_user(), 'username', None)
|
|
|
|
|
|
from rhodecode.lib.hooks import log_create_user, check_allowed_create_user
|
|
|
_fd = form_data
|
|
|
user_data = {
|
|
|
'username': _fd['username'], 'password': _fd['password'],
|
|
|
'email': _fd['email'], 'firstname': _fd['firstname'], 'lastname': _fd['lastname'],
|
|
|
'active': _fd['active'], 'admin': False
|
|
|
}
|
|
|
# raises UserCreationError if it's not allowed
|
|
|
check_allowed_create_user(user_data, cur_user)
|
|
|
from rhodecode.lib.auth import get_crypt_password
|
|
|
try:
|
|
|
new_user = User()
|
|
|
for k, v in form_data.items():
|
|
|
if k == 'password':
|
|
|
v = get_crypt_password(v)
|
|
|
if k == 'firstname':
|
|
|
k = 'name'
|
|
|
setattr(new_user, k, v)
|
|
|
|
|
|
new_user.api_key = generate_api_key(form_data['username'])
|
|
|
self.sa.add(new_user)
|
|
|
|
|
|
log_create_user(new_user.get_dict(), cur_user)
|
|
|
return new_user
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise
|
|
|
|
|
|
def create_or_update(self, username, password, email, firstname='',
|
|
|
lastname='', active=True, admin=False,
|
|
|
extern_type=None, extern_name=None, cur_user=None):
|
|
|
"""
|
|
|
Creates a new instance if not found, or updates current one
|
|
|
|
|
|
:param username:
|
|
|
:param password:
|
|
|
:param email:
|
|
|
:param active:
|
|
|
:param firstname:
|
|
|
:param lastname:
|
|
|
:param active:
|
|
|
:param admin:
|
|
|
:param extern_name:
|
|
|
:param extern_type:
|
|
|
:param cur_user:
|
|
|
"""
|
|
|
if not cur_user:
|
|
|
cur_user = getattr(get_current_rhodecode_user(), 'username', None)
|
|
|
|
|
|
from rhodecode.lib.auth import get_crypt_password, check_password
|
|
|
from rhodecode.lib.hooks import log_create_user, check_allowed_create_user
|
|
|
user_data = {
|
|
|
'username': username, 'password': password,
|
|
|
'email': email, 'firstname': firstname, 'lastname': lastname,
|
|
|
'active': active, 'admin': admin
|
|
|
}
|
|
|
# raises UserCreationError if it's not allowed
|
|
|
check_allowed_create_user(user_data, cur_user)
|
|
|
|
|
|
log.debug('Checking for %s account in RhodeCode database' % username)
|
|
|
user = User.get_by_username(username, case_insensitive=True)
|
|
|
if user is None:
|
|
|
log.debug('creating new user %s' % username)
|
|
|
new_user = User()
|
|
|
edit = False
|
|
|
else:
|
|
|
log.debug('updating user %s' % username)
|
|
|
new_user = user
|
|
|
edit = True
|
|
|
|
|
|
try:
|
|
|
new_user.username = username
|
|
|
new_user.admin = admin
|
|
|
new_user.email = email
|
|
|
new_user.active = active
|
|
|
new_user.extern_name = safe_unicode(extern_name) if extern_name else None
|
|
|
new_user.extern_type = safe_unicode(extern_type) if extern_type else None
|
|
|
new_user.name = firstname
|
|
|
new_user.lastname = lastname
|
|
|
|
|
|
if not edit:
|
|
|
new_user.api_key = generate_api_key(username)
|
|
|
|
|
|
# set password only if creating an user or password is changed
|
|
|
password_change = new_user.password and not check_password(password,
|
|
|
new_user.password)
|
|
|
if not edit or password_change:
|
|
|
reason = 'new password' if edit else 'new user'
|
|
|
log.debug('Updating password reason=>%s' % (reason,))
|
|
|
new_user.password = get_crypt_password(password) if password else None
|
|
|
|
|
|
self.sa.add(new_user)
|
|
|
|
|
|
if not edit:
|
|
|
log_create_user(new_user.get_dict(), cur_user)
|
|
|
return new_user
|
|
|
except (DatabaseError,):
|
|
|
log.error(traceback.format_exc())
|
|
|
raise
|
|
|
|
|
|
def create_registration(self, form_data):
|
|
|
from rhodecode.model.notification import NotificationModel
|
|
|
|
|
|
try:
|
|
|
form_data['admin'] = False
|
|
|
form_data['extern_name'] = 'rhodecode'
|
|
|
form_data['extern_type'] = 'rhodecode'
|
|
|
new_user = self.create(form_data)
|
|
|
|
|
|
self.sa.add(new_user)
|
|
|
self.sa.flush()
|
|
|
|
|
|
# notification to admins
|
|
|
subject = _('New user registration')
|
|
|
body = ('New user registration\n'
|
|
|
'---------------------\n'
|
|
|
'- Username: %s\n'
|
|
|
'- Full Name: %s\n'
|
|
|
'- Email: %s\n')
|
|
|
body = body % (new_user.username, new_user.full_name, new_user.email)
|
|
|
edit_url = url('edit_user', id=new_user.user_id, qualified=True)
|
|
|
kw = {'registered_user_url': edit_url}
|
|
|
NotificationModel().create(created_by=new_user, subject=subject,
|
|
|
body=body, recipients=None,
|
|
|
type_=Notification.TYPE_REGISTRATION,
|
|
|
email_kwargs=kw)
|
|
|
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise
|
|
|
|
|
|
def update(self, user_id, form_data, skip_attrs=[]):
|
|
|
from rhodecode.lib.auth import get_crypt_password
|
|
|
try:
|
|
|
user = self.get(user_id, cache=False)
|
|
|
if user.username == User.DEFAULT_USER:
|
|
|
raise DefaultUserException(
|
|
|
_("You can't Edit this user since it's "
|
|
|
"crucial for entire application"))
|
|
|
|
|
|
for k, v in form_data.items():
|
|
|
if k in skip_attrs:
|
|
|
continue
|
|
|
if k == 'new_password' and v:
|
|
|
user.password = get_crypt_password(v)
|
|
|
else:
|
|
|
# old legacy thing orm models store firstname as name,
|
|
|
# need proper refactor to username
|
|
|
if k == 'firstname':
|
|
|
k = 'name'
|
|
|
setattr(user, k, v)
|
|
|
self.sa.add(user)
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise
|
|
|
|
|
|
def update_user(self, user, **kwargs):
|
|
|
from rhodecode.lib.auth import get_crypt_password
|
|
|
try:
|
|
|
user = self._get_user(user)
|
|
|
if user.username == User.DEFAULT_USER:
|
|
|
raise DefaultUserException(
|
|
|
_("You can't Edit this user since it's"
|
|
|
" crucial for entire application")
|
|
|
)
|
|
|
|
|
|
for k, v in kwargs.items():
|
|
|
if k == 'password' and v:
|
|
|
v = get_crypt_password(v)
|
|
|
|
|
|
setattr(user, k, v)
|
|
|
self.sa.add(user)
|
|
|
return user
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise
|
|
|
|
|
|
def delete(self, user, cur_user=None):
|
|
|
if not cur_user:
|
|
|
cur_user = getattr(get_current_rhodecode_user(), 'username', None)
|
|
|
user = self._get_user(user)
|
|
|
|
|
|
try:
|
|
|
if user.username == User.DEFAULT_USER:
|
|
|
raise DefaultUserException(
|
|
|
_(u"You can't remove this user since it's"
|
|
|
" crucial for entire application")
|
|
|
)
|
|
|
if user.repositories:
|
|
|
repos = [x.repo_name for x in user.repositories]
|
|
|
raise UserOwnsReposException(
|
|
|
_(u'user "%s" still owns %s repositories and cannot be '
|
|
|
'removed. Switch owners or remove those repositories. %s')
|
|
|
% (user.username, len(repos), ', '.join(repos))
|
|
|
)
|
|
|
self.sa.delete(user)
|
|
|
|
|
|
from rhodecode.lib.hooks import log_delete_user
|
|
|
log_delete_user(user.get_dict(), cur_user)
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise
|
|
|
|
|
|
def reset_password_link(self, data):
|
|
|
from rhodecode.lib.celerylib import tasks, run_task
|
|
|
from rhodecode.model.notification import EmailNotificationModel
|
|
|
user_email = data['email']
|
|
|
try:
|
|
|
user = User.get_by_email(user_email)
|
|
|
if user:
|
|
|
log.debug('password reset user found %s' % user)
|
|
|
link = url('reset_password_confirmation', key=user.api_key,
|
|
|
qualified=True)
|
|
|
reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
|
|
|
body = EmailNotificationModel().get_email_tmpl(reg_type,
|
|
|
**{'user': user.short_contact,
|
|
|
'reset_url': link})
|
|
|
log.debug('sending email')
|
|
|
run_task(tasks.send_email, user_email,
|
|
|
_("Password reset link"), body, body)
|
|
|
log.info('send new password mail to %s' % user_email)
|
|
|
else:
|
|
|
log.debug("password reset email %s not found" % user_email)
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def reset_password(self, data):
|
|
|
from rhodecode.lib.celerylib import tasks, run_task
|
|
|
from rhodecode.lib import auth
|
|
|
user_email = data['email']
|
|
|
pre_db = True
|
|
|
try:
|
|
|
user = User.get_by_email(user_email)
|
|
|
new_passwd = auth.PasswordGenerator().gen_password(8,
|
|
|
auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
|
|
|
if user:
|
|
|
user.password = auth.get_crypt_password(new_passwd)
|
|
|
Session().add(user)
|
|
|
Session().commit()
|
|
|
log.info('change password for %s' % user_email)
|
|
|
if new_passwd is None:
|
|
|
raise Exception('unable to generate new password')
|
|
|
|
|
|
pre_db = False
|
|
|
run_task(tasks.send_email, user_email,
|
|
|
_('Your new password'),
|
|
|
_('Your new RhodeCode password:%s') % (new_passwd,))
|
|
|
log.info('send new password mail to %s' % user_email)
|
|
|
|
|
|
except Exception:
|
|
|
log.error('Failed to update user password')
|
|
|
log.error(traceback.format_exc())
|
|
|
if pre_db:
|
|
|
# we rollback only if local db stuff fails. If it goes into
|
|
|
# run_task, we're pass rollback state this wouldn't work then
|
|
|
Session().rollback()
|
|
|
|
|
|
return True
|
|
|
|
|
|
def fill_data(self, auth_user, user_id=None, api_key=None, username=None):
|
|
|
"""
|
|
|
Fetches auth_user by user_id,or api_key if present.
|
|
|
Fills auth_user attributes with those taken from database.
|
|
|
Additionally set's is_authenitated if lookup fails
|
|
|
present in database
|
|
|
|
|
|
:param auth_user: instance of user to set attributes
|
|
|
:param user_id: user id to fetch by
|
|
|
:param api_key: api key to fetch by
|
|
|
:param username: username to fetch by
|
|
|
"""
|
|
|
if user_id is None and api_key is None and username is None:
|
|
|
raise Exception('You need to pass user_id, api_key or username')
|
|
|
|
|
|
try:
|
|
|
dbuser = None
|
|
|
if user_id:
|
|
|
dbuser = self.get(user_id)
|
|
|
elif api_key:
|
|
|
dbuser = self.get_by_api_key(api_key)
|
|
|
elif username:
|
|
|
dbuser = self.get_by_username(username)
|
|
|
|
|
|
if dbuser is not None and dbuser.active:
|
|
|
log.debug('filling %s data' % dbuser)
|
|
|
for k, v in dbuser.get_dict().iteritems():
|
|
|
if k not in ['api_keys', 'permissions']:
|
|
|
setattr(auth_user, k, v)
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
except Exception:
|
|
|
log.error(traceback.format_exc())
|
|
|
auth_user.is_authenticated = False
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def has_perm(self, user, perm):
|
|
|
perm = self._get_perm(perm)
|
|
|
user = self._get_user(user)
|
|
|
|
|
|
return UserToPerm.query().filter(UserToPerm.user == user)\
|
|
|
.filter(UserToPerm.permission == perm).scalar() is not None
|
|
|
|
|
|
def grant_perm(self, user, perm):
|
|
|
"""
|
|
|
Grant user global permissions
|
|
|
|
|
|
:param user:
|
|
|
:param perm:
|
|
|
"""
|
|
|
user = self._get_user(user)
|
|
|
perm = self._get_perm(perm)
|
|
|
# if this permission is already granted skip it
|
|
|
_perm = UserToPerm.query()\
|
|
|
.filter(UserToPerm.user == user)\
|
|
|
.filter(UserToPerm.permission == perm)\
|
|
|
.scalar()
|
|
|
if _perm:
|
|
|
return
|
|
|
new = UserToPerm()
|
|
|
new.user = user
|
|
|
new.permission = perm
|
|
|
self.sa.add(new)
|
|
|
return new
|
|
|
|
|
|
def revoke_perm(self, user, perm):
|
|
|
"""
|
|
|
Revoke users global permissions
|
|
|
|
|
|
:param user:
|
|
|
:param perm:
|
|
|
"""
|
|
|
user = self._get_user(user)
|
|
|
perm = self._get_perm(perm)
|
|
|
|
|
|
obj = UserToPerm.query()\
|
|
|
.filter(UserToPerm.user == user)\
|
|
|
.filter(UserToPerm.permission == perm)\
|
|
|
.scalar()
|
|
|
if obj:
|
|
|
self.sa.delete(obj)
|
|
|
|
|
|
def add_extra_email(self, user, email):
|
|
|
"""
|
|
|
Adds email address to UserEmailMap
|
|
|
|
|
|
:param user:
|
|
|
:param email:
|
|
|
"""
|
|
|
from rhodecode.model import forms
|
|
|
form = forms.UserExtraEmailForm()()
|
|
|
data = form.to_python(dict(email=email))
|
|
|
user = self._get_user(user)
|
|
|
|
|
|
obj = UserEmailMap()
|
|
|
obj.user = user
|
|
|
obj.email = data['email']
|
|
|
self.sa.add(obj)
|
|
|
return obj
|
|
|
|
|
|
def delete_extra_email(self, user, email_id):
|
|
|
"""
|
|
|
Removes email address from UserEmailMap
|
|
|
|
|
|
:param user:
|
|
|
:param email_id:
|
|
|
"""
|
|
|
user = self._get_user(user)
|
|
|
obj = UserEmailMap.query().get(email_id)
|
|
|
if obj:
|
|
|
self.sa.delete(obj)
|
|
|
|
|
|
def add_extra_ip(self, user, ip):
|
|
|
"""
|
|
|
Adds ip address to UserIpMap
|
|
|
|
|
|
:param user:
|
|
|
:param ip:
|
|
|
"""
|
|
|
from rhodecode.model import forms
|
|
|
form = forms.UserExtraIpForm()()
|
|
|
data = form.to_python(dict(ip=ip))
|
|
|
user = self._get_user(user)
|
|
|
|
|
|
obj = UserIpMap()
|
|
|
obj.user = user
|
|
|
obj.ip_addr = data['ip']
|
|
|
self.sa.add(obj)
|
|
|
return obj
|
|
|
|
|
|
def delete_extra_ip(self, user, ip_id):
|
|
|
"""
|
|
|
Removes ip address from UserIpMap
|
|
|
|
|
|
:param user:
|
|
|
:param ip_id:
|
|
|
"""
|
|
|
user = self._get_user(user)
|
|
|
obj = UserIpMap.query().get(ip_id)
|
|
|
if obj:
|
|
|
self.sa.delete(obj)
|
|
|
|