users.py
510 lines
| 19.0 KiB
| text/x-python
|
PythonLexer
r1518 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2016-2017 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import logging | ||||
r1630 | import datetime | |||
r1821 | import formencode | |||
r1518 | ||||
from pyramid.httpexceptions import HTTPFound | ||||
from pyramid.view import view_config | ||||
r1630 | from sqlalchemy.sql.functions import coalesce | |||
r1559 | ||||
r1821 | from rhodecode.apps._base import BaseAppView, DataGridAppView | |||
from rhodecode.lib import audit_logger | ||||
r1750 | from rhodecode.lib.ext_json import json | |||
r1518 | from rhodecode.lib.auth import ( | |||
LoginRequired, HasPermissionAllDecorator, CSRFRequired) | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib.utils import PartialRenderer | ||||
r1520 | from rhodecode.lib.utils2 import safe_int, safe_unicode | |||
r1518 | from rhodecode.model.auth_token import AuthTokenModel | |||
r1559 | from rhodecode.model.user import UserModel | |||
Bartłomiej Wołyńczyk
|
r1556 | from rhodecode.model.user_group import UserGroupModel | ||
r1821 | from rhodecode.model.db import User, or_, UserIpMap, UserEmailMap, UserApiKeys | |||
r1518 | from rhodecode.model.meta import Session | |||
log = logging.getLogger(__name__) | ||||
r1653 | class AdminUsersView(BaseAppView, DataGridAppView): | |||
r1518 | ALLOW_SCOPED_TOKENS = False | |||
""" | ||||
This view has alternative version inside EE, if modified please take a look | ||||
in there as well. | ||||
""" | ||||
def load_default_context(self): | ||||
c = self._get_local_tmpl_context() | ||||
c.allow_scoped_tokens = self.ALLOW_SCOPED_TOKENS | ||||
self._register_global_c(c) | ||||
return c | ||||
def _redirect_for_default_user(self, username): | ||||
_ = self.request.translate | ||||
if username == User.DEFAULT_USER: | ||||
h.flash(_("You can't edit this user"), category='warning') | ||||
# TODO(marcink): redirect to 'users' admin panel once this | ||||
# is a pyramid view | ||||
raise HTTPFound('/') | ||||
r1520 | @HasPermissionAllDecorator('hg.admin') | |||
@view_config( | ||||
route_name='users', request_method='GET', | ||||
renderer='rhodecode:templates/admin/users/users.mako') | ||||
def users_list(self): | ||||
c = self.load_default_context() | ||||
return self._get_template_context(c) | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@view_config( | ||||
# renderer defined below | ||||
r1665 | route_name='users_data', request_method='GET', | |||
renderer='json_ext', xhr=True) | ||||
r1520 | def users_list_data(self): | |||
draw, start, limit = self._extract_chunk(self.request) | ||||
search_q, order_by, order_dir = self._extract_ordering(self.request) | ||||
_render = PartialRenderer('data_table/_dt_elements.mako') | ||||
def user_actions(user_id, username): | ||||
return _render("user_actions", user_id, username) | ||||
users_data_total_count = User.query()\ | ||||
.filter(User.username != User.DEFAULT_USER) \ | ||||
.count() | ||||
# json generate | ||||
base_q = User.query().filter(User.username != User.DEFAULT_USER) | ||||
if search_q: | ||||
r1521 | like_expression = u'%{}%'.format(safe_unicode(search_q)) | |||
r1520 | base_q = base_q.filter(or_( | |||
User.username.ilike(like_expression), | ||||
User._email.ilike(like_expression), | ||||
User.name.ilike(like_expression), | ||||
User.lastname.ilike(like_expression), | ||||
)) | ||||
users_data_total_filtered_count = base_q.count() | ||||
sort_col = getattr(User, order_by, None) | ||||
r1630 | if sort_col: | |||
if order_dir == 'asc': | ||||
# handle null values properly to order by NULL last | ||||
if order_by in ['last_activity']: | ||||
sort_col = coalesce(sort_col, datetime.date.max) | ||||
sort_col = sort_col.asc() | ||||
else: | ||||
# handle null values properly to order by NULL last | ||||
if order_by in ['last_activity']: | ||||
sort_col = coalesce(sort_col, datetime.date.min) | ||||
sort_col = sort_col.desc() | ||||
r1520 | ||||
r1630 | base_q = base_q.order_by(sort_col) | |||
r1520 | base_q = base_q.offset(start).limit(limit) | |||
r1630 | ||||
r1520 | users_list = base_q.all() | |||
users_data = [] | ||||
for user in users_list: | ||||
users_data.append({ | ||||
"username": h.gravatar_with_user(user.username), | ||||
"email": user.email, | ||||
r1815 | "first_name": user.first_name, | |||
"last_name": user.last_name, | ||||
r1520 | "last_login": h.format_date(user.last_login), | |||
r1546 | "last_activity": h.format_date(user.last_activity), | |||
r1520 | "active": h.bool2icon(user.active), | |||
"active_raw": user.active, | ||||
"admin": h.bool2icon(user.admin), | ||||
"extern_type": user.extern_type, | ||||
"extern_name": user.extern_name, | ||||
"action": user_actions(user.user_id, user.username), | ||||
}) | ||||
data = ({ | ||||
'draw': draw, | ||||
'data': users_data, | ||||
'recordsTotal': users_data_total_count, | ||||
'recordsFiltered': users_data_total_filtered_count, | ||||
}) | ||||
return data | ||||
r1518 | @LoginRequired() | |||
@HasPermissionAllDecorator('hg.admin') | ||||
@view_config( | ||||
route_name='edit_user_auth_tokens', request_method='GET', | ||||
renderer='rhodecode:templates/admin/users/user_edit.mako') | ||||
def auth_tokens(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
c.active = 'auth_tokens' | ||||
c.lifetime_values = [ | ||||
(str(-1), _('forever')), | ||||
(str(5), _('5 minutes')), | ||||
(str(60), _('1 hour')), | ||||
(str(60 * 24), _('1 day')), | ||||
(str(60 * 24 * 30), _('1 month')), | ||||
] | ||||
c.lifetime_options = [(c.lifetime_values, _("Lifetime"))] | ||||
c.role_values = [ | ||||
(x, AuthTokenModel.cls._get_role_name(x)) | ||||
for x in AuthTokenModel.cls.ROLES] | ||||
c.role_options = [(c.role_values, _("Role"))] | ||||
c.user_auth_tokens = AuthTokenModel().get_auth_tokens( | ||||
c.user.user_id, show_expired=True) | ||||
return self._get_template_context(c) | ||||
def maybe_attach_token_scope(self, token): | ||||
# implemented in EE edition | ||||
pass | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
@view_config( | ||||
route_name='edit_user_auth_tokens_add', request_method='POST') | ||||
def auth_tokens_add(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
r1821 | ||||
r1518 | self._redirect_for_default_user(c.user.username) | |||
r1821 | user_data = c.user.get_api_data() | |||
r1518 | lifetime = safe_int(self.request.POST.get('lifetime'), -1) | |||
description = self.request.POST.get('description') | ||||
role = self.request.POST.get('role') | ||||
token = AuthTokenModel().create( | ||||
c.user.user_id, description, lifetime, role) | ||||
r1821 | token_data = token.get_api_data() | |||
r1518 | self.maybe_attach_token_scope(token) | |||
r1822 | audit_logger.store_web( | |||
r1821 | action='user.edit.token.add', | |||
action_data={'data': {'token': token_data, 'user': user_data}}, | ||||
user=self._rhodecode_user, ) | ||||
r1518 | Session().commit() | |||
h.flash(_("Auth token successfully created"), category='success') | ||||
return HTTPFound(h.route_path('edit_user_auth_tokens', user_id=user_id)) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
@view_config( | ||||
route_name='edit_user_auth_tokens_delete', request_method='POST') | ||||
def auth_tokens_delete(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
r1821 | user_data = c.user.get_api_data() | |||
r1518 | ||||
del_auth_token = self.request.POST.get('del_auth_token') | ||||
if del_auth_token: | ||||
r1821 | token = UserApiKeys.get_or_404(del_auth_token, pyramid_exc=True) | |||
token_data = token.get_api_data() | ||||
r1518 | AuthTokenModel().delete(del_auth_token, c.user.user_id) | |||
r1822 | audit_logger.store_web( | |||
r1821 | action='user.edit.token.delete', | |||
action_data={'data': {'token': token_data, 'user': user_data}}, | ||||
user=self._rhodecode_user,) | ||||
r1518 | Session().commit() | |||
h.flash(_("Auth token successfully deleted"), category='success') | ||||
return HTTPFound(h.route_path('edit_user_auth_tokens', user_id=user_id)) | ||||
Bartłomiej Wołyńczyk
|
r1556 | |||
r1821 | @LoginRequired() | |||
@HasPermissionAllDecorator('hg.admin') | ||||
@view_config( | ||||
route_name='edit_user_emails', request_method='GET', | ||||
renderer='rhodecode:templates/admin/users/user_edit.mako') | ||||
def emails(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
c.active = 'emails' | ||||
c.user_email_map = UserEmailMap.query() \ | ||||
.filter(UserEmailMap.user == c.user).all() | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
@view_config( | ||||
route_name='edit_user_emails_add', request_method='POST') | ||||
def emails_add(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
email = self.request.POST.get('new_email') | ||||
user_data = c.user.get_api_data() | ||||
try: | ||||
UserModel().add_extra_email(c.user.user_id, email) | ||||
audit_logger.store_web( | ||||
'user.edit.email.add', | ||||
action_data={'email': email, 'user': user_data}, | ||||
user=self._rhodecode_user) | ||||
Session().commit() | ||||
h.flash(_("Added new email address `%s` for user account") % email, | ||||
category='success') | ||||
except formencode.Invalid as error: | ||||
msg = error.error_dict['email'] | ||||
h.flash(msg, category='error') | ||||
except Exception: | ||||
log.exception("Exception during email saving") | ||||
h.flash(_('An error occurred during email saving'), | ||||
category='error') | ||||
raise HTTPFound(h.route_path('edit_user_emails', user_id=user_id)) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
@view_config( | ||||
route_name='edit_user_emails_delete', request_method='POST') | ||||
def emails_delete(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
email_id = self.request.POST.get('del_email_id') | ||||
user_model = UserModel() | ||||
email = UserEmailMap.query().get(email_id).email | ||||
user_data = c.user.get_api_data() | ||||
user_model.delete_extra_email(c.user.user_id, email_id) | ||||
audit_logger.store_web( | ||||
'user.edit.email.delete', | ||||
action_data={'email': email, 'user': user_data}, | ||||
user=self._rhodecode_user) | ||||
Session().commit() | ||||
h.flash(_("Removed email address from user account"), | ||||
category='success') | ||||
raise HTTPFound(h.route_path('edit_user_emails', user_id=user_id)) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@view_config( | ||||
route_name='edit_user_ips', request_method='GET', | ||||
renderer='rhodecode:templates/admin/users/user_edit.mako') | ||||
def ips(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
c.active = 'ips' | ||||
c.user_ip_map = UserIpMap.query() \ | ||||
.filter(UserIpMap.user == c.user).all() | ||||
c.inherit_default_ips = c.user.inherit_default_permissions | ||||
c.default_user_ip_map = UserIpMap.query() \ | ||||
.filter(UserIpMap.user == User.get_default_user()).all() | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
@view_config( | ||||
route_name='edit_user_ips_add', request_method='POST') | ||||
def ips_add(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
# NOTE(marcink): this view is allowed for default users, as we can | ||||
# edit their IP white list | ||||
user_model = UserModel() | ||||
desc = self.request.POST.get('description') | ||||
try: | ||||
ip_list = user_model.parse_ip_range( | ||||
self.request.POST.get('new_ip')) | ||||
except Exception as e: | ||||
ip_list = [] | ||||
log.exception("Exception during ip saving") | ||||
h.flash(_('An error occurred during ip saving:%s' % (e,)), | ||||
category='error') | ||||
added = [] | ||||
user_data = c.user.get_api_data() | ||||
for ip in ip_list: | ||||
try: | ||||
user_model.add_extra_ip(c.user.user_id, ip, desc) | ||||
audit_logger.store_web( | ||||
'user.edit.ip.add', | ||||
action_data={'ip': ip, 'user': user_data}, | ||||
user=self._rhodecode_user) | ||||
Session().commit() | ||||
added.append(ip) | ||||
except formencode.Invalid as error: | ||||
msg = error.error_dict['ip'] | ||||
h.flash(msg, category='error') | ||||
except Exception: | ||||
log.exception("Exception during ip saving") | ||||
h.flash(_('An error occurred during ip saving'), | ||||
category='error') | ||||
if added: | ||||
h.flash( | ||||
_("Added ips %s to user whitelist") % (', '.join(ip_list), ), | ||||
category='success') | ||||
if 'default_user' in self.request.POST: | ||||
# case for editing global IP list we do it for 'DEFAULT' user | ||||
raise HTTPFound(h.route_path('admin_permissions_ips')) | ||||
raise HTTPFound(h.route_path('edit_user_ips', user_id=user_id)) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@CSRFRequired() | ||||
@view_config( | ||||
route_name='edit_user_ips_delete', request_method='POST') | ||||
def ips_delete(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
# NOTE(marcink): this view is allowed for default users, as we can | ||||
# edit their IP white list | ||||
ip_id = self.request.POST.get('del_ip_id') | ||||
user_model = UserModel() | ||||
user_data = c.user.get_api_data() | ||||
ip = UserIpMap.query().get(ip_id).ip_addr | ||||
user_model.delete_extra_ip(c.user.user_id, ip_id) | ||||
audit_logger.store_web( | ||||
'user.edit.ip.delete', | ||||
action_data={'ip': ip, 'user': user_data}, | ||||
user=self._rhodecode_user) | ||||
Session().commit() | ||||
h.flash(_("Removed ip address from user whitelist"), category='success') | ||||
if 'default_user' in self.request.POST: | ||||
# case for editing global IP list we do it for 'DEFAULT' user | ||||
raise HTTPFound(h.route_path('admin_permissions_ips')) | ||||
raise HTTPFound(h.route_path('edit_user_ips', user_id=user_id)) | ||||
Bartłomiej Wołyńczyk
|
r1556 | @LoginRequired() | ||
@HasPermissionAllDecorator('hg.admin') | ||||
@view_config( | ||||
route_name='edit_user_groups_management', request_method='GET', | ||||
renderer='rhodecode:templates/admin/users/user_edit.mako') | ||||
def groups_management(self): | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
c.data = c.user.group_member | ||||
self._redirect_for_default_user(c.user.username) | ||||
r1676 | groups = [UserGroupModel.get_user_groups_as_dict(group.users_group) | |||
for group in c.user.group_member] | ||||
Bartłomiej Wołyńczyk
|
r1556 | c.groups = json.dumps(groups) | ||
c.active = 'groups' | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
r1811 | @CSRFRequired() | |||
Bartłomiej Wołyńczyk
|
r1556 | @view_config( | ||
route_name='edit_user_groups_management_updates', request_method='POST') | ||||
def groups_management_updates(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
users_groups = set(self.request.POST.getall('users_group_id')) | ||||
users_groups_model = [] | ||||
for ugid in users_groups: | ||||
users_groups_model.append(UserGroupModel().get_group(safe_int(ugid))) | ||||
user_group_model = UserGroupModel() | ||||
user_group_model.change_groups(c.user, users_groups_model) | ||||
Session().commit() | ||||
c.active = 'user_groups_management' | ||||
h.flash(_("Groups successfully changed"), category='success') | ||||
r1559 | return HTTPFound(h.route_path( | |||
'edit_user_groups_management', user_id=user_id)) | ||||
@LoginRequired() | ||||
@HasPermissionAllDecorator('hg.admin') | ||||
@view_config( | ||||
route_name='edit_user_audit_logs', request_method='GET', | ||||
renderer='rhodecode:templates/admin/users/user_edit.mako') | ||||
def user_audit_logs(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
user_id = self.request.matchdict.get('user_id') | ||||
c.user = User.get_or_404(user_id, pyramid_exc=True) | ||||
self._redirect_for_default_user(c.user.username) | ||||
c.active = 'audit' | ||||
p = safe_int(self.request.GET.get('page', 1), 1) | ||||
filter_term = self.request.GET.get('filter') | ||||
r1696 | user_log = UserModel().get_user_log(c.user, filter_term) | |||
r1559 | ||||
def url_generator(**kw): | ||||
if filter_term: | ||||
kw['filter'] = filter_term | ||||
return self.request.current_route_path(_query=kw) | ||||
r1821 | c.audit_logs = h.Page( | |||
user_log, page=p, items_per_page=10, url=url_generator) | ||||
r1559 | c.filter_term = filter_term | |||
return self._get_template_context(c) | ||||