user.py
514 lines
| 18.8 KiB
| text/x-python
|
PythonLexer
|
r4187 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
kallithea.model.user | ||||
~~~~~~~~~~~~~~~~~~~~ | ||||
|
r4212 | users model for Kallithea | ||
|
r4187 | |||
|
r4211 | This file was forked by the Kallithea project in July 2014. | ||
Original author and date, and relevant copyright and licensing information is below: | ||||
|
r4187 | :created_on: Apr 9, 2010 | ||
:author: marcink | ||||
|
r4211 | :copyright: (c) 2013 RhodeCode GmbH, and others. | ||
|
r4208 | :license: GPLv3, see LICENSE.md for more details. | ||
|
r4187 | """ | ||
|
r5457 | import hashlib | ||
|
r5458 | import hmac | ||
|
r4187 | import logging | ||
|
r5457 | import time | ||
|
r4187 | import traceback | ||
|
r5457 | |||
|
r7718 | from sqlalchemy.exc import DatabaseError | ||
|
r6508 | from tg import config | ||
from tg.i18n import ugettext as _ | ||||
|
r4187 | |||
from kallithea.lib.caching_query import FromCache | ||||
|
r7718 | from kallithea.lib.exceptions import DefaultUserException, UserOwnsReposException | ||
|
r8075 | from kallithea.lib.utils2 import generate_api_key, get_current_authuser | ||
|
r7718 | from kallithea.model.db import Permission, User, UserEmailMap, UserIpMap, UserToPerm | ||
|
r4187 | from kallithea.model.meta import Session | ||
log = logging.getLogger(__name__) | ||||
|
r6483 | class UserModel(object): | ||
|
r5457 | password_reset_token_lifetime = 86400 # 24 hours | ||
|
r4187 | def get(self, user_id, cache=False): | ||
|
r6483 | user = User.query() | ||
|
r4187 | if cache: | ||
user = user.options(FromCache("sql_cache_short", | ||||
"get_user_%s" % user_id)) | ||||
return user.get(user_id) | ||||
def get_user(self, user): | ||||
|
r6423 | return User.guess_instance(user) | ||
|
r4187 | |||
def create(self, form_data, cur_user=None): | ||||
if not cur_user: | ||||
|
r4197 | cur_user = getattr(get_current_authuser(), 'username', None) | ||
|
r4187 | |||
|
r5032 | from kallithea.lib.hooks import log_create_user, \ | ||
check_allowed_create_user | ||||
|
r4187 | _fd = form_data | ||
user_data = { | ||||
|
r5032 | 'username': _fd['username'], | ||
'password': _fd['password'], | ||||
'email': _fd['email'], | ||||
'firstname': _fd['firstname'], | ||||
'lastname': _fd['lastname'], | ||||
'active': _fd['active'], | ||||
'admin': False | ||||
|
r4187 | } | ||
# raises UserCreationError if it's not allowed | ||||
check_allowed_create_user(user_data, cur_user) | ||||
from kallithea.lib.auth import get_crypt_password | ||||
|
r4733 | new_user = User() | ||
for k, v in form_data.items(): | ||||
if k == 'password': | ||||
v = get_crypt_password(v) | ||||
if k == 'firstname': | ||||
k = 'name' | ||||
setattr(new_user, k, v) | ||||
|
r4187 | |||
|
r5217 | new_user.api_key = generate_api_key() | ||
|
r6279 | Session().add(new_user) | ||
Session().flush() # make database assign new_user.user_id | ||||
|
r4733 | |||
log_create_user(new_user.get_dict(), cur_user) | ||||
return new_user | ||||
|
r4187 | |||
|
r5610 | def create_or_update(self, username, password, email, firstname=u'', | ||
lastname=u'', active=True, admin=False, | ||||
|
r4187 | extern_type=None, extern_name=None, cur_user=None): | ||
""" | ||||
Creates a new instance if not found, or updates current one | ||||
:param username: | ||||
:param password: | ||||
:param email: | ||||
:param active: | ||||
:param firstname: | ||||
:param lastname: | ||||
:param active: | ||||
:param admin: | ||||
:param extern_name: | ||||
:param extern_type: | ||||
:param cur_user: | ||||
""" | ||||
if not cur_user: | ||||
|
r4197 | cur_user = getattr(get_current_authuser(), 'username', None) | ||
|
r4187 | |||
from kallithea.lib.auth import get_crypt_password, check_password | ||||
|
r5032 | from kallithea.lib.hooks import log_create_user, \ | ||
check_allowed_create_user | ||||
|
r4187 | user_data = { | ||
'username': username, 'password': password, | ||||
'email': email, 'firstname': firstname, 'lastname': lastname, | ||||
'active': active, 'admin': admin | ||||
} | ||||
# raises UserCreationError if it's not allowed | ||||
check_allowed_create_user(user_data, cur_user) | ||||
|
r5375 | log.debug('Checking for %s account in Kallithea database', username) | ||
|
r4187 | user = User.get_by_username(username, case_insensitive=True) | ||
if user is None: | ||||
|
r5375 | log.debug('creating new user %s', username) | ||
|
r4187 | new_user = User() | ||
edit = False | ||||
else: | ||||
|
r5375 | log.debug('updating user %s', username) | ||
|
r4187 | new_user = user | ||
edit = True | ||||
try: | ||||
new_user.username = username | ||||
new_user.admin = admin | ||||
new_user.email = email | ||||
new_user.active = active | ||||
|
r8075 | new_user.extern_name = extern_name | ||
new_user.extern_type = extern_type | ||||
|
r4187 | new_user.name = firstname | ||
new_user.lastname = lastname | ||||
if not edit: | ||||
|
r5217 | new_user.api_key = generate_api_key() | ||
|
r4187 | |||
# set password only if creating an user or password is changed | ||||
|
r5032 | password_change = new_user.password and \ | ||
not check_password(password, new_user.password) | ||||
|
r4187 | if not edit or password_change: | ||
reason = 'new password' if edit else 'new user' | ||||
|
r5375 | log.debug('Updating password reason=>%s', reason) | ||
|
r5032 | new_user.password = get_crypt_password(password) \ | ||
|
r6166 | if password else '' | ||
|
r4187 | |||
|
r6279 | if user is None: | ||
Session().add(new_user) | ||||
Session().flush() # make database assign new_user.user_id | ||||
|
r4187 | |||
if not edit: | ||||
log_create_user(new_user.get_dict(), cur_user) | ||||
|
r6279 | |||
|
r4187 | return new_user | ||
except (DatabaseError,): | ||||
log.error(traceback.format_exc()) | ||||
raise | ||||
def create_registration(self, form_data): | ||||
from kallithea.model.notification import NotificationModel | ||||
|
r4445 | import kallithea.lib.helpers as h | ||
|
r4187 | |||
|
r4733 | form_data['admin'] = False | ||
|
r6168 | form_data['extern_type'] = User.DEFAULT_AUTH_TYPE | ||
form_data['extern_name'] = '' | ||||
|
r4733 | new_user = self.create(form_data) | ||
|
r4187 | |||
|
r4733 | # notification to admins | ||
subject = _('New user registration') | ||||
|
r5032 | body = ( | ||
|
r5610 | u'New user registration\n' | ||
|
r5032 | '---------------------\n' | ||
'- Username: {user.username}\n' | ||||
'- Full Name: {user.full_name}\n' | ||||
'- Email: {user.email}\n' | ||||
).format(user=new_user) | ||||
|
r4733 | edit_url = h.canonical_url('edit_user', id=new_user.user_id) | ||
|
r5032 | email_kwargs = { | ||
'registered_user_url': edit_url, | ||||
|
r6098 | 'new_username': new_user.username, | ||
'new_email': new_user.email, | ||||
'new_full_name': new_user.full_name} | ||||
|
r4733 | NotificationModel().create(created_by=new_user, subject=subject, | ||
body=body, recipients=None, | ||||
|
r7368 | type_=NotificationModel.TYPE_REGISTRATION, | ||
|
r4733 | email_kwargs=email_kwargs) | ||
|
r4187 | |||
|
r5555 | def update(self, user_id, form_data, skip_attrs=None): | ||
|
r4187 | from kallithea.lib.auth import get_crypt_password | ||
|
r5555 | skip_attrs = skip_attrs or [] | ||
|
r4733 | user = self.get(user_id, cache=False) | ||
|
r6476 | if user.is_default_user: | ||
|
r4733 | raise DefaultUserException( | ||
|
r5424 | _("You can't edit this user since it's " | ||
|
r4733 | "crucial for entire application")) | ||
|
r4187 | |||
|
r4733 | for k, v in form_data.items(): | ||
if k in skip_attrs: | ||||
continue | ||||
if k == 'new_password' and v: | ||||
user.password = get_crypt_password(v) | ||||
else: | ||||
# old legacy thing orm models store firstname as name, | ||||
# need proper refactor to username | ||||
if k == 'firstname': | ||||
k = 'name' | ||||
setattr(user, k, v) | ||||
|
r4187 | |||
def update_user(self, user, **kwargs): | ||||
from kallithea.lib.auth import get_crypt_password | ||||
|
r4733 | |||
|
r6423 | user = User.guess_instance(user) | ||
|
r6476 | if user.is_default_user: | ||
|
r4733 | raise DefaultUserException( | ||
|
r5424 | _("You can't edit this user since it's" | ||
|
r4733 | " crucial for entire application") | ||
) | ||||
|
r4187 | |||
|
r4733 | for k, v in kwargs.items(): | ||
if k == 'password' and v: | ||||
v = get_crypt_password(v) | ||||
|
r4187 | |||
|
r4733 | setattr(user, k, v) | ||
return user | ||||
|
r4187 | |||
def delete(self, user, cur_user=None): | ||||
|
r5306 | if cur_user is None: | ||
|
r4197 | cur_user = getattr(get_current_authuser(), 'username', None) | ||
|
r6423 | user = User.guess_instance(user) | ||
|
r4187 | |||
|
r6476 | if user.is_default_user: | ||
|
r4733 | raise DefaultUserException( | ||
|
r5296 | _("You can't remove this user since it is" | ||
" crucial for the entire application")) | ||||
|
r4733 | if user.repositories: | ||
repos = [x.repo_name for x in user.repositories] | ||||
raise UserOwnsReposException( | ||||
|
r5295 | _('User "%s" still owns %s repositories and cannot be ' | ||
|
r4745 | 'removed. Switch owners or remove those repositories: %s') | ||
|
r5032 | % (user.username, len(repos), ', '.join(repos))) | ||
|
r4745 | if user.repo_groups: | ||
repogroups = [x.group_name for x in user.repo_groups] | ||||
|
r5032 | raise UserOwnsReposException(_( | ||
'User "%s" still owns %s repository groups and cannot be ' | ||||
'removed. Switch owners or remove those repository groups: %s') | ||||
% (user.username, len(repogroups), ', '.join(repogroups))) | ||||
|
r4746 | if user.user_groups: | ||
usergroups = [x.users_group_name for x in user.user_groups] | ||||
raise UserOwnsReposException( | ||||
|
r5032 | _('User "%s" still owns %s user groups and cannot be ' | ||
|
r4746 | 'removed. Switch owners or remove those user groups: %s') | ||
|
r5032 | % (user.username, len(usergroups), ', '.join(usergroups))) | ||
|
r6483 | Session().delete(user) | ||
|
r4187 | |||
|
r4733 | from kallithea.lib.hooks import log_delete_user | ||
log_delete_user(user.get_dict(), cur_user) | ||||
|
r4187 | |||
|
r5582 | def can_change_password(self, user): | ||
from kallithea.lib import auth_modules | ||||
managed_fields = auth_modules.get_managed_fields(user) | ||||
return 'password' not in managed_fields | ||||
|
r5457 | def get_reset_password_token(self, user, timestamp, session_id): | ||
""" | ||||
|
r5458 | The token is a 40-digit hexstring, calculated as a HMAC-SHA1. | ||
|
r5457 | |||
|
r5458 | In a traditional HMAC scenario, an attacker is unable to know or | ||
influence the secret key, but can know or influence the message | ||||
and token. This scenario is slightly different (in particular | ||||
since the message sender is also the message recipient), but | ||||
sufficiently similar to use an HMAC. Benefits compared to a plain | ||||
SHA1 hash includes resistance against a length extension attack. | ||||
The HMAC key consists of the following values (known only to the | ||||
server and authorized users): | ||||
* per-application secret (the `app_instance_uuid` setting), without | ||||
which an attacker cannot counterfeit tokens | ||||
* hashed user password, invalidating the token upon password change | ||||
|
r5457 | |||
|
r5458 | The HMAC message consists of the following values (potentially known | ||
to an attacker): | ||||
* session ID (the anti-CSRF token), requiring an attacker to have | ||||
access to the browser session in which the token was created | ||||
* numeric user ID, limiting the token to a specific user (yet allowing | ||||
users to be renamed) | ||||
* user email address | ||||
* time of token issue (a Unix timestamp, to enable token expiration) | ||||
The key and message values are separated by NUL characters, which are | ||||
guaranteed not to occur in any of the values. | ||||
|
r5457 | """ | ||
|
r5458 | app_secret = config.get('app_instance_uuid') | ||
return hmac.HMAC( | ||||
key=u'\0'.join([app_secret, user.password]).encode('utf-8'), | ||||
msg=u'\0'.join([session_id, str(user.user_id), user.email, str(timestamp)]).encode('utf-8'), | ||||
digestmod=hashlib.sha1, | ||||
).hexdigest() | ||||
|
r5457 | |||
def send_reset_password_email(self, data): | ||||
""" | ||||
Sends email with a password reset token and link to the password | ||||
reset confirmation page with all information (including the token) | ||||
pre-filled. Also returns URL of that page, only without the token, | ||||
allowing users to copy-paste or manually enter the token from the | ||||
email. | ||||
""" | ||||
|
r6133 | from kallithea.lib.celerylib import tasks | ||
|
r4187 | from kallithea.model.notification import EmailNotificationModel | ||
|
r4445 | import kallithea.lib.helpers as h | ||
|
r4187 | user_email = data['email'] | ||
|
r4733 | user = User.get_by_email(user_email) | ||
|
r5457 | timestamp = int(time.time()) | ||
|
r5306 | if user is not None: | ||
|
r5582 | if self.can_change_password(user): | ||
log.debug('password reset user %s found', user) | ||||
token = self.get_reset_password_token(user, | ||||
timestamp, | ||||
|
r7709 | h.session_csrf_secret_token()) | ||
|
r5582 | # URL must be fully qualified; but since the token is locked to | ||
# the current browser session, we must provide a URL with the | ||||
# current scheme and hostname, rather than the canonical_url. | ||||
link = h.url('reset_password_confirmation', qualified=True, | ||||
email=user_email, | ||||
timestamp=timestamp, | ||||
token=token) | ||||
else: | ||||
log.debug('password reset user %s found but was managed', user) | ||||
token = link = None | ||||
|
r4733 | reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET | ||
|
r5032 | body = EmailNotificationModel().get_email_tmpl( | ||
reg_type, 'txt', | ||||
user=user.short_contact, | ||||
|
r5457 | reset_token=token, | ||
|
r5032 | reset_url=link) | ||
html_body = EmailNotificationModel().get_email_tmpl( | ||||
reg_type, 'html', | ||||
user=user.short_contact, | ||||
|
r5457 | reset_token=token, | ||
|
r5032 | reset_url=link) | ||
|
r4733 | log.debug('sending email') | ||
|
r6133 | tasks.send_email([user_email], _("Password reset link"), body, html_body) | ||
|
r5375 | log.info('send new password mail to %s', user_email) | ||
|
r4733 | else: | ||
|
r5375 | log.debug("password reset email %s not found", user_email) | ||
|
r4187 | |||
|
r5457 | return h.url('reset_password_confirmation', | ||
email=user_email, | ||||
timestamp=timestamp) | ||||
|
r4187 | |||
|
r5457 | def verify_reset_password_token(self, email, timestamp, token): | ||
import kallithea.lib.helpers as h | ||||
user = User.get_by_email(email) | ||||
if user is None: | ||||
log.debug("user with email %s not found", email) | ||||
return False | ||||
token_age = int(time.time()) - int(timestamp) | ||||
if token_age < 0: | ||||
log.debug('timestamp is from the future') | ||||
return False | ||||
if token_age > UserModel.password_reset_token_lifetime: | ||||
log.debug('password reset token expired') | ||||
return False | ||||
expected_token = self.get_reset_password_token(user, | ||||
timestamp, | ||||
|
r7709 | h.session_csrf_secret_token()) | ||
|
r5457 | log.debug('computed password reset token: %s', expected_token) | ||
log.debug('received password reset token: %s', token) | ||||
return expected_token == token | ||||
def reset_password(self, user_email, new_passwd): | ||||
|
r6133 | from kallithea.lib.celerylib import tasks | ||
|
r5457 | from kallithea.lib import auth | ||
|
r4733 | user = User.get_by_email(user_email) | ||
|
r5306 | if user is not None: | ||
|
r5582 | if not self.can_change_password(user): | ||
raise Exception('trying to change password for external user') | ||||
|
r4733 | user.password = auth.get_crypt_password(new_passwd) | ||
Session().commit() | ||||
|
r5375 | log.info('change password for %s', user_email) | ||
|
r4733 | if new_passwd is None: | ||
|
r5457 | raise Exception('unable to set new password') | ||
|
r4187 | |||
|
r6133 | tasks.send_email([user_email], | ||
|
r5457 | _('Password reset notification'), | ||
_('The password to your account %s has been changed using password reset form.') % (user.username,)) | ||||
log.info('send password reset mail to %s', user_email) | ||||
|
r4187 | |||
return True | ||||
def has_perm(self, user, perm): | ||||
|
r6425 | perm = Permission.guess_instance(perm) | ||
|
r6423 | user = User.guess_instance(user) | ||
|
r4187 | |||
|
r5585 | return UserToPerm.query().filter(UserToPerm.user == user) \ | ||
|
r4187 | .filter(UserToPerm.permission == perm).scalar() is not None | ||
def grant_perm(self, user, perm): | ||||
""" | ||||
Grant user global permissions | ||||
:param user: | ||||
:param perm: | ||||
""" | ||||
|
r6423 | user = User.guess_instance(user) | ||
|
r6425 | perm = Permission.guess_instance(perm) | ||
|
r4187 | # if this permission is already granted skip it | ||
|
r5585 | _perm = UserToPerm.query() \ | ||
.filter(UserToPerm.user == user) \ | ||||
.filter(UserToPerm.permission == perm) \ | ||||
|
r4187 | .scalar() | ||
if _perm: | ||||
return | ||||
new = UserToPerm() | ||||
new.user = user | ||||
new.permission = perm | ||||
|
r6483 | Session().add(new) | ||
|
r4187 | return new | ||
def revoke_perm(self, user, perm): | ||||
""" | ||||
Revoke users global permissions | ||||
:param user: | ||||
:param perm: | ||||
""" | ||||
|
r6423 | user = User.guess_instance(user) | ||
|
r6425 | perm = Permission.guess_instance(perm) | ||
|
r4187 | |||
|
r5033 | UserToPerm.query().filter( | ||
UserToPerm.user == user, | ||||
UserToPerm.permission == perm, | ||||
).delete() | ||||
|
r4187 | |||
def add_extra_email(self, user, email): | ||||
""" | ||||
Adds email address to UserEmailMap | ||||
:param user: | ||||
:param email: | ||||
""" | ||||
from kallithea.model import forms | ||||
form = forms.UserExtraEmailForm()() | ||||
data = form.to_python(dict(email=email)) | ||||
|
r6423 | user = User.guess_instance(user) | ||
|
r4187 | |||
obj = UserEmailMap() | ||||
obj.user = user | ||||
obj.email = data['email'] | ||||
|
r6483 | Session().add(obj) | ||
|
r4187 | return obj | ||
def delete_extra_email(self, user, email_id): | ||||
""" | ||||
Removes email address from UserEmailMap | ||||
:param user: | ||||
:param email_id: | ||||
""" | ||||
|
r6423 | user = User.guess_instance(user) | ||
|
r4187 | obj = UserEmailMap.query().get(email_id) | ||
|
r5306 | if obj is not None: | ||
|
r6483 | Session().delete(obj) | ||
|
r4187 | |||
def add_extra_ip(self, user, ip): | ||||
""" | ||||
|
r5125 | Adds IP address to UserIpMap | ||
|
r4187 | |||
:param user: | ||||
:param ip: | ||||
""" | ||||
from kallithea.model import forms | ||||
form = forms.UserExtraIpForm()() | ||||
data = form.to_python(dict(ip=ip)) | ||||
|
r6423 | user = User.guess_instance(user) | ||
|
r4187 | |||
obj = UserIpMap() | ||||
obj.user = user | ||||
obj.ip_addr = data['ip'] | ||||
|
r6483 | Session().add(obj) | ||
|
r4187 | return obj | ||
def delete_extra_ip(self, user, ip_id): | ||||
""" | ||||
|
r5125 | Removes IP address from UserIpMap | ||
|
r4187 | |||
:param user: | ||||
:param ip_id: | ||||
""" | ||||
|
r6423 | user = User.guess_instance(user) | ||
|
r4187 | obj = UserIpMap.query().get(ip_id) | ||
if obj: | ||||
|
r6483 | Session().delete(obj) | ||