users.py
436 lines
| 16.7 KiB
| text/x-python
|
PythonLexer
|
r4187 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
kallithea.controllers.admin.users | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
|
r6178 | Users crud controller | ||
|
r4187 | |||
|
r4211 | This file was forked by the Kallithea project in July 2014. | ||
Original author and date, and relevant copyright and licensing information is below: | ||||
|
r4187 | :created_on: Apr 4, 2010 | ||
:author: marcink | ||||
|
r4211 | :copyright: (c) 2013 RhodeCode GmbH, and others. | ||
|
r4208 | :license: GPLv3, see LICENSE.md for more details. | ||
|
r4187 | """ | ||
import logging | ||||
import traceback | ||||
import formencode | ||||
from formencode import htmlfill | ||||
|
r6522 | from tg import request, tmpl_context as c, config, app_globals | ||
|
r6508 | from tg.i18n import ugettext as _ | ||
|
r4187 | from sqlalchemy.sql.expression import func | ||
|
r5543 | from webob.exc import HTTPFound, HTTPNotFound | ||
|
r4187 | |||
import kallithea | ||||
|
r6182 | from kallithea.config.routing import url | ||
|
r4187 | from kallithea.lib.exceptions import DefaultUserException, \ | ||
UserOwnsReposException, UserCreationError | ||||
from kallithea.lib import helpers as h | ||||
|
r6026 | from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator, \ | ||
|
r5215 | AuthUser | ||
|
r4187 | from kallithea.lib import auth_modules | ||
from kallithea.lib.base import BaseController, render | ||||
from kallithea.model.api_key import ApiKeyModel | ||||
from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm | ||||
from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm | ||||
from kallithea.model.user import UserModel | ||||
from kallithea.model.meta import Session | ||||
from kallithea.lib.utils import action_logger | ||||
|
r5215 | from kallithea.lib.utils2 import datetime_to_time, safe_int, generate_api_key | ||
|
r4187 | |||
log = logging.getLogger(__name__) | ||||
class UsersController(BaseController): | ||||
"""REST Controller styled on the Atom Publishing Protocol""" | ||||
@LoginRequired() | ||||
|
r6026 | @HasPermissionAnyDecorator('hg.admin') | ||
|
r6513 | def _before(self, *args, **kwargs): | ||
super(UsersController, self)._before(*args, **kwargs) | ||||
|
r4187 | c.available_permissions = config['available_permissions'] | ||
def index(self, format='html'): | ||||
|
r5585 | c.users_list = User.query().order_by(User.username) \ | ||
|
r6476 | .filter_by(is_default_user=False) \ | ||
|
r5585 | .order_by(func.lower(User.username)) \ | ||
|
r4187 | .all() | ||
users_data = [] | ||||
total_records = len(c.users_list) | ||||
|
r6522 | _tmpl_lookup = app_globals.mako_lookup | ||
|
r4187 | template = _tmpl_lookup.get_template('data_table/_dt_elements.html') | ||
|
r4797 | grav_tmpl = '<div class="gravatar">%s</div>' | ||
|
r4187 | |||
username = lambda user_id, username: ( | ||||
template.get_def("user_name") | ||||
.render(user_id, username, _=_, h=h, c=c)) | ||||
user_actions = lambda user_id, username: ( | ||||
template.get_def("user_actions") | ||||
.render(user_id, username, _=_, h=h, c=c)) | ||||
for user in c.users_list: | ||||
users_data.append({ | ||||
|
r4797 | "gravatar": grav_tmpl % h.gravatar(user.email, size=20), | ||
|
r4187 | "raw_name": user.username, | ||
"username": username(user.user_id, user.username), | ||||
|
r5008 | "firstname": h.escape(user.name), | ||
"lastname": h.escape(user.lastname), | ||||
|
r4187 | "last_login": h.fmt_date(user.last_login), | ||
"last_login_raw": datetime_to_time(user.last_login), | ||||
"active": h.boolicon(user.active), | ||||
"admin": h.boolicon(user.admin), | ||||
"extern_type": user.extern_type, | ||||
"extern_name": user.extern_name, | ||||
"action": user_actions(user.user_id, user.username), | ||||
}) | ||||
|
r6492 | c.data = { | ||
|
r4187 | "totalRecords": total_records, | ||
"startIndex": 0, | ||||
"sort": None, | ||||
"dir": "asc", | ||||
"records": users_data | ||||
|
r6492 | } | ||
|
r4187 | |||
return render('admin/users/users.html') | ||||
def create(self): | ||||
|
r6168 | c.default_extern_type = User.DEFAULT_AUTH_TYPE | ||
c.default_extern_name = '' | ||||
|
r4187 | user_model = UserModel() | ||
user_form = UserForm()() | ||||
try: | ||||
form_result = user_form.to_python(dict(request.POST)) | ||||
|
r4708 | user = user_model.create(form_result) | ||
|
r6412 | action_logger(request.authuser, 'admin_created_user:%s' % user.username, | ||
|
r6480 | None, request.ip_addr) | ||
|
r6118 | h.flash(_('Created user %s') % user.username, | ||
|
r4187 | category='success') | ||
Session().commit() | ||||
|
r5374 | except formencode.Invalid as errors: | ||
|
r4187 | return htmlfill.render( | ||
render('admin/users/user_add.html'), | ||||
defaults=errors.value, | ||||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
|
r4941 | encoding="UTF-8", | ||
force_defaults=False) | ||||
|
r5374 | except UserCreationError as e: | ||
|
r4187 | h.flash(e, 'error') | ||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
|
r6789 | h.flash(_('Error occurred during creation of user %s') | ||
|
r4187 | % request.POST.get('username'), category='error') | ||
|
r6118 | raise HTTPFound(location=url('edit_user', id=user.user_id)) | ||
|
r4187 | |||
def new(self, format='html'): | ||||
|
r6168 | c.default_extern_type = User.DEFAULT_AUTH_TYPE | ||
c.default_extern_name = '' | ||||
|
r4187 | return render('admin/users/user_add.html') | ||
def update(self, id): | ||||
user_model = UserModel() | ||||
|
r5430 | user = user_model.get(id) | ||
|
r4187 | _form = UserForm(edit=True, old_data={'user_id': id, | ||
|
r5430 | 'email': user.email})() | ||
|
r4187 | form_result = {} | ||
try: | ||||
form_result = _form.to_python(dict(request.POST)) | ||||
|
r5343 | skip_attrs = ['extern_type', 'extern_name', | ||
|
r5430 | ] + auth_modules.get_managed_fields(user) | ||
|
r4187 | |||
user_model.update(id, form_result, skip_attrs=skip_attrs) | ||||
usr = form_result['username'] | ||||
|
r6412 | action_logger(request.authuser, 'admin_updated_user:%s' % usr, | ||
|
r6480 | None, request.ip_addr) | ||
|
r4187 | h.flash(_('User updated successfully'), category='success') | ||
Session().commit() | ||||
|
r5374 | except formencode.Invalid as errors: | ||
|
r4187 | defaults = errors.value | ||
e = errors.error_dict or {} | ||||
defaults.update({ | ||||
'create_repo_perm': user_model.has_perm(id, | ||||
'hg.create.repository'), | ||||
'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'), | ||||
}) | ||||
return htmlfill.render( | ||||
|
r5430 | self._render_edit_profile(user), | ||
|
r4187 | defaults=defaults, | ||
errors=e, | ||||
prefix_error=False, | ||||
|
r4941 | encoding="UTF-8", | ||
force_defaults=False) | ||||
|
r4187 | except Exception: | ||
log.error(traceback.format_exc()) | ||||
|
r6789 | h.flash(_('Error occurred during update of user %s') | ||
|
r4187 | % form_result.get('username'), category='error') | ||
|
r5543 | raise HTTPFound(location=url('edit_user', id=id)) | ||
|
r4187 | |||
def delete(self, id): | ||||
usr = User.get_or_404(id) | ||||
try: | ||||
UserModel().delete(usr) | ||||
Session().commit() | ||||
h.flash(_('Successfully deleted user'), category='success') | ||||
|
r5374 | except (UserOwnsReposException, DefaultUserException) as e: | ||
|
r4187 | h.flash(e, category='warning') | ||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('An error occurred during deletion of user'), | ||||
category='error') | ||||
|
r5543 | raise HTTPFound(location=url('users')) | ||
|
r4187 | |||
|
r5167 | def _get_user_or_raise_if_default(self, id): | ||
try: | ||||
return User.get_or_404(id, allow_default=False) | ||||
except DefaultUserException: | ||||
h.flash(_("The default user cannot be edited"), category='warning') | ||||
raise HTTPNotFound | ||||
|
r5430 | def _render_edit_profile(self, user): | ||
c.user = user | ||||
c.active = 'profile' | ||||
c.perm_user = AuthUser(dbuser=user) | ||||
managed_fields = auth_modules.get_managed_fields(user) | ||||
c.readonly = lambda n: 'readonly' if n in managed_fields else None | ||||
return render('admin/users/user_edit.html') | ||||
|
r4187 | def edit(self, id, format='html'): | ||
|
r5430 | user = self._get_user_or_raise_if_default(id) | ||
defaults = user.get_dict() | ||||
|
r4187 | |||
return htmlfill.render( | ||||
|
r5430 | self._render_edit_profile(user), | ||
|
r4187 | defaults=defaults, | ||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
def edit_advanced(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | c.active = 'advanced' | ||
|
r6196 | c.perm_user = AuthUser(dbuser=c.user) | ||
|
r4187 | |||
umodel = UserModel() | ||||
defaults = c.user.get_dict() | ||||
defaults.update({ | ||||
'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'), | ||||
'create_user_group_perm': umodel.has_perm(c.user, | ||||
'hg.usergroup.create.true'), | ||||
'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'), | ||||
}) | ||||
return htmlfill.render( | ||||
render('admin/users/user_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
def edit_api_keys(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | c.active = 'api_keys' | ||
show_expired = True | ||||
c.lifetime_values = [ | ||||
|
r5127 | (str(-1), _('Forever')), | ||
|
r4187 | (str(5), _('5 minutes')), | ||
(str(60), _('1 hour')), | ||||
(str(60 * 24), _('1 day')), | ||||
(str(60 * 24 * 30), _('1 month')), | ||||
] | ||||
c.lifetime_options = [(c.lifetime_values, _("Lifetime"))] | ||||
c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id, | ||||
show_expired=show_expired) | ||||
defaults = c.user.get_dict() | ||||
return htmlfill.render( | ||||
render('admin/users/user_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
def add_api_key(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | |||
lifetime = safe_int(request.POST.get('lifetime'), -1) | ||||
description = request.POST.get('description') | ||||
|
r4423 | ApiKeyModel().create(c.user.user_id, description, lifetime) | ||
|
r4187 | Session().commit() | ||
|
r5124 | h.flash(_("API key successfully created"), category='success') | ||
|
r5543 | raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id)) | ||
|
r4187 | |||
def delete_api_key(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | |||
api_key = request.POST.get('del_api_key') | ||||
if request.POST.get('del_api_key_builtin'): | ||||
|
r6196 | c.user.api_key = generate_api_key() | ||
Session().commit() | ||||
h.flash(_("API key successfully reset"), category='success') | ||||
|
r4187 | elif api_key: | ||
ApiKeyModel().delete(api_key, c.user.user_id) | ||||
Session().commit() | ||||
|
r5124 | h.flash(_("API key successfully deleted"), category='success') | ||
|
r4187 | |||
|
r5543 | raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id)) | ||
|
r4187 | |||
def update_account(self, id): | ||||
pass | ||||
def edit_perms(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | c.active = 'perms' | ||
|
r6196 | c.perm_user = AuthUser(dbuser=c.user) | ||
|
r4187 | |||
umodel = UserModel() | ||||
defaults = c.user.get_dict() | ||||
defaults.update({ | ||||
'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'), | ||||
'create_user_group_perm': umodel.has_perm(c.user, | ||||
'hg.usergroup.create.true'), | ||||
'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'), | ||||
}) | ||||
return htmlfill.render( | ||||
render('admin/users/user_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
def update_perms(self, id): | ||||
|
r5168 | user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | |||
try: | ||||
form = CustomDefaultPermissionsForm()() | ||||
form_result = form.to_python(request.POST) | ||||
inherit_perms = form_result['inherit_default_permissions'] | ||||
user.inherit_default_permissions = inherit_perms | ||||
user_model = UserModel() | ||||
|
r5585 | defs = UserToPerm.query() \ | ||
.filter(UserToPerm.user == user) \ | ||||
|
r4187 | .all() | ||
for ug in defs: | ||||
Session().delete(ug) | ||||
if form_result['create_repo_perm']: | ||||
user_model.grant_perm(id, 'hg.create.repository') | ||||
else: | ||||
user_model.grant_perm(id, 'hg.create.none') | ||||
if form_result['create_user_group_perm']: | ||||
user_model.grant_perm(id, 'hg.usergroup.create.true') | ||||
else: | ||||
user_model.grant_perm(id, 'hg.usergroup.create.false') | ||||
if form_result['fork_repo_perm']: | ||||
user_model.grant_perm(id, 'hg.fork.repository') | ||||
else: | ||||
user_model.grant_perm(id, 'hg.fork.none') | ||||
h.flash(_("Updated permissions"), category='success') | ||||
Session().commit() | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('An error occurred during permissions saving'), | ||||
category='error') | ||||
|
r5543 | raise HTTPFound(location=url('edit_user_perms', id=id)) | ||
|
r4187 | |||
def edit_emails(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | c.active = 'emails' | ||
|
r5585 | c.user_email_map = UserEmailMap.query() \ | ||
|
r4187 | .filter(UserEmailMap.user == c.user).all() | ||
defaults = c.user.get_dict() | ||||
return htmlfill.render( | ||||
render('admin/users/user_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
def add_email(self, id): | ||||
|
r5168 | user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | email = request.POST.get('new_email') | ||
user_model = UserModel() | ||||
try: | ||||
user_model.add_extra_email(id, email) | ||||
Session().commit() | ||||
h.flash(_("Added email %s to user") % email, category='success') | ||||
|
r5374 | except formencode.Invalid as error: | ||
|
r4187 | msg = error.error_dict['email'] | ||
h.flash(msg, category='error') | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('An error occurred during email saving'), | ||||
category='error') | ||||
|
r5543 | raise HTTPFound(location=url('edit_user_emails', id=id)) | ||
|
r4187 | |||
def delete_email(self, id): | ||||
|
r5168 | user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | email_id = request.POST.get('del_email_id') | ||
user_model = UserModel() | ||||
user_model.delete_extra_email(id, email_id) | ||||
Session().commit() | ||||
h.flash(_("Removed email from user"), category='success') | ||||
|
r5543 | raise HTTPFound(location=url('edit_user_emails', id=id)) | ||
|
r4187 | |||
def edit_ips(self, id): | ||||
|
r5167 | c.user = self._get_user_or_raise_if_default(id) | ||
|
r4187 | c.active = 'ips' | ||
|
r5585 | c.user_ip_map = UserIpMap.query() \ | ||
|
r4187 | .filter(UserIpMap.user == c.user).all() | ||
c.inherit_default_ips = c.user.inherit_default_permissions | ||||
|
r5585 | c.default_user_ip_map = UserIpMap.query() \ | ||
|
r4187 | .filter(UserIpMap.user == User.get_default_user()).all() | ||
defaults = c.user.get_dict() | ||||
return htmlfill.render( | ||||
render('admin/users/user_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
def add_ip(self, id): | ||||
ip = request.POST.get('new_ip') | ||||
user_model = UserModel() | ||||
try: | ||||
user_model.add_extra_ip(id, ip) | ||||
Session().commit() | ||||
|
r5150 | h.flash(_("Added IP address %s to user whitelist") % ip, category='success') | ||
|
r5374 | except formencode.Invalid as error: | ||
|
r4187 | msg = error.error_dict['ip'] | ||
h.flash(msg, category='error') | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
|
r5371 | h.flash(_('An error occurred while adding IP address'), | ||
|
r4187 | category='error') | ||
if 'default_user' in request.POST: | ||||
|
r5543 | raise HTTPFound(location=url('admin_permissions_ips')) | ||
raise HTTPFound(location=url('edit_user_ips', id=id)) | ||||
|
r4187 | |||
def delete_ip(self, id): | ||||
ip_id = request.POST.get('del_ip_id') | ||||
user_model = UserModel() | ||||
user_model.delete_extra_ip(id, ip_id) | ||||
Session().commit() | ||||
|
r5125 | h.flash(_("Removed IP address from user whitelist"), category='success') | ||
|
r4187 | |||
if 'default_user' in request.POST: | ||||
|
r5543 | raise HTTPFound(location=url('admin_permissions_ips')) | ||
raise HTTPFound(location=url('edit_user_ips', id=id)) | ||||