##// END OF EJS Templates
diffs: use whole chunk diff to calculate if it's oversized or not....
diffs: use whole chunk diff to calculate if it's oversized or not. - This fixes an issue if a file is added that has very large number of small lines. In this case the time to detect if the diff should be limited was very very long and CPU intensive.

File last commit:

r2067:e89cfa8f default
r2070:7939c6bf default
Show More
users.py
673 lines | 24.7 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2016-2017 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import datetime
import formencode
from pyramid.httpexceptions import HTTPFound
from pyramid.view import view_config
from sqlalchemy.sql.functions import coalesce
from sqlalchemy.exc import IntegrityError
from rhodecode.apps._base import BaseAppView, DataGridAppView
from rhodecode.apps.ssh_support import SshKeyFileChangeEvent
from rhodecode.events import trigger
from rhodecode.lib import audit_logger
from rhodecode.lib.ext_json import json
from rhodecode.lib.auth import (
LoginRequired, HasPermissionAllDecorator, CSRFRequired)
from rhodecode.lib import helpers as h
from rhodecode.lib.utils2 import safe_int, safe_unicode
from rhodecode.model.auth_token import AuthTokenModel
from rhodecode.model.ssh_key import SshKeyModel
from rhodecode.model.user import UserModel
from rhodecode.model.user_group import UserGroupModel
from rhodecode.model.db import (
or_, User, UserIpMap, UserEmailMap, UserApiKeys, UserSshKeys)
from rhodecode.model.meta import Session
log = logging.getLogger(__name__)
class AdminUsersView(BaseAppView, DataGridAppView):
ALLOW_SCOPED_TOKENS = False
"""
This view has alternative version inside EE, if modified please take a look
in there as well.
"""
def load_default_context(self):
c = self._get_local_tmpl_context()
c.allow_scoped_tokens = self.ALLOW_SCOPED_TOKENS
self._register_global_c(c)
return c
def _redirect_for_default_user(self, username):
_ = self.request.translate
if username == User.DEFAULT_USER:
h.flash(_("You can't edit this user"), category='warning')
# TODO(marcink): redirect to 'users' admin panel once this
# is a pyramid view
raise HTTPFound('/')
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='users', request_method='GET',
renderer='rhodecode:templates/admin/users/users.mako')
def users_list(self):
c = self.load_default_context()
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
# renderer defined below
route_name='users_data', request_method='GET',
renderer='json_ext', xhr=True)
def users_list_data(self):
column_map = {
'first_name': 'name',
'last_name': 'lastname',
}
draw, start, limit = self._extract_chunk(self.request)
search_q, order_by, order_dir = self._extract_ordering(
self.request, column_map=column_map)
_render = self.request.get_partial_renderer(
'data_table/_dt_elements.mako')
def user_actions(user_id, username):
return _render("user_actions", user_id, username)
users_data_total_count = User.query()\
.filter(User.username != User.DEFAULT_USER) \
.count()
# json generate
base_q = User.query().filter(User.username != User.DEFAULT_USER)
if search_q:
like_expression = u'%{}%'.format(safe_unicode(search_q))
base_q = base_q.filter(or_(
User.username.ilike(like_expression),
User._email.ilike(like_expression),
User.name.ilike(like_expression),
User.lastname.ilike(like_expression),
))
users_data_total_filtered_count = base_q.count()
sort_col = getattr(User, order_by, None)
if sort_col:
if order_dir == 'asc':
# handle null values properly to order by NULL last
if order_by in ['last_activity']:
sort_col = coalesce(sort_col, datetime.date.max)
sort_col = sort_col.asc()
else:
# handle null values properly to order by NULL last
if order_by in ['last_activity']:
sort_col = coalesce(sort_col, datetime.date.min)
sort_col = sort_col.desc()
base_q = base_q.order_by(sort_col)
base_q = base_q.offset(start).limit(limit)
users_list = base_q.all()
users_data = []
for user in users_list:
users_data.append({
"username": h.gravatar_with_user(self.request, user.username),
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"last_login": h.format_date(user.last_login),
"last_activity": h.format_date(user.last_activity),
"active": h.bool2icon(user.active),
"active_raw": user.active,
"admin": h.bool2icon(user.admin),
"extern_type": user.extern_type,
"extern_name": user.extern_name,
"action": user_actions(user.user_id, user.username),
})
data = ({
'draw': draw,
'data': users_data,
'recordsTotal': users_data_total_count,
'recordsFiltered': users_data_total_filtered_count,
})
return data
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_auth_tokens', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def auth_tokens(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'auth_tokens'
c.lifetime_values = [
(str(-1), _('forever')),
(str(5), _('5 minutes')),
(str(60), _('1 hour')),
(str(60 * 24), _('1 day')),
(str(60 * 24 * 30), _('1 month')),
]
c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
c.role_values = [
(x, AuthTokenModel.cls._get_role_name(x))
for x in AuthTokenModel.cls.ROLES]
c.role_options = [(c.role_values, _("Role"))]
c.user_auth_tokens = AuthTokenModel().get_auth_tokens(
c.user.user_id, show_expired=True)
return self._get_template_context(c)
def maybe_attach_token_scope(self, token):
# implemented in EE edition
pass
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_auth_tokens_add', request_method='POST')
def auth_tokens_add(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
user_data = c.user.get_api_data()
lifetime = safe_int(self.request.POST.get('lifetime'), -1)
description = self.request.POST.get('description')
role = self.request.POST.get('role')
token = AuthTokenModel().create(
c.user.user_id, description, lifetime, role)
token_data = token.get_api_data()
self.maybe_attach_token_scope(token)
audit_logger.store_web(
'user.edit.token.add', action_data={
'data': {'token': token_data, 'user': user_data}},
user=self._rhodecode_user, )
Session().commit()
h.flash(_("Auth token successfully created"), category='success')
return HTTPFound(h.route_path('edit_user_auth_tokens', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_auth_tokens_delete', request_method='POST')
def auth_tokens_delete(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
user_data = c.user.get_api_data()
del_auth_token = self.request.POST.get('del_auth_token')
if del_auth_token:
token = UserApiKeys.get_or_404(del_auth_token)
token_data = token.get_api_data()
AuthTokenModel().delete(del_auth_token, c.user.user_id)
audit_logger.store_web(
'user.edit.token.delete', action_data={
'data': {'token': token_data, 'user': user_data}},
user=self._rhodecode_user,)
Session().commit()
h.flash(_("Auth token successfully deleted"), category='success')
return HTTPFound(h.route_path('edit_user_auth_tokens', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_ssh_keys', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def ssh_keys(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'ssh_keys'
c.default_key = self.request.GET.get('default_key')
c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id)
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_ssh_keys_generate_keypair', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def ssh_keys_generate_keypair(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'ssh_keys_generate'
comment = 'RhodeCode-SSH {}'.format(c.user.email or '')
c.private, c.public = SshKeyModel().generate_keypair(comment=comment)
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_ssh_keys_add', request_method='POST')
def ssh_keys_add(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
user_data = c.user.get_api_data()
key_data = self.request.POST.get('key_data')
description = self.request.POST.get('description')
try:
if not key_data:
raise ValueError('Please add a valid public key')
key = SshKeyModel().parse_key(key_data.strip())
fingerprint = key.hash_md5()
ssh_key = SshKeyModel().create(
c.user.user_id, fingerprint, key_data, description)
ssh_key_data = ssh_key.get_api_data()
audit_logger.store_web(
'user.edit.ssh_key.add', action_data={
'data': {'ssh_key': ssh_key_data, 'user': user_data}},
user=self._rhodecode_user, )
Session().commit()
# Trigger an event on change of keys.
trigger(SshKeyFileChangeEvent(), self.request.registry)
h.flash(_("Ssh Key successfully created"), category='success')
except IntegrityError:
log.exception("Exception during ssh key saving")
h.flash(_('An error occurred during ssh key saving: {}').format(
'Such key already exists, please use a different one'),
category='error')
except Exception as e:
log.exception("Exception during ssh key saving")
h.flash(_('An error occurred during ssh key saving: {}').format(e),
category='error')
return HTTPFound(
h.route_path('edit_user_ssh_keys', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_ssh_keys_delete', request_method='POST')
def ssh_keys_delete(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
user_data = c.user.get_api_data()
del_ssh_key = self.request.POST.get('del_ssh_key')
if del_ssh_key:
ssh_key = UserSshKeys.get_or_404(del_ssh_key)
ssh_key_data = ssh_key.get_api_data()
SshKeyModel().delete(del_ssh_key, c.user.user_id)
audit_logger.store_web(
'user.edit.ssh_key.delete', action_data={
'data': {'ssh_key': ssh_key_data, 'user': user_data}},
user=self._rhodecode_user,)
Session().commit()
# Trigger an event on change of keys.
trigger(SshKeyFileChangeEvent(), self.request.registry)
h.flash(_("Ssh key successfully deleted"), category='success')
return HTTPFound(h.route_path('edit_user_ssh_keys', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_emails', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def emails(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'emails'
c.user_email_map = UserEmailMap.query() \
.filter(UserEmailMap.user == c.user).all()
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_emails_add', request_method='POST')
def emails_add(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
email = self.request.POST.get('new_email')
user_data = c.user.get_api_data()
try:
UserModel().add_extra_email(c.user.user_id, email)
audit_logger.store_web(
'user.edit.email.add', action_data={'email': email, 'user': user_data},
user=self._rhodecode_user)
Session().commit()
h.flash(_("Added new email address `%s` for user account") % email,
category='success')
except formencode.Invalid as error:
h.flash(h.escape(error.error_dict['email']), category='error')
except Exception:
log.exception("Exception during email saving")
h.flash(_('An error occurred during email saving'),
category='error')
raise HTTPFound(h.route_path('edit_user_emails', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_emails_delete', request_method='POST')
def emails_delete(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
email_id = self.request.POST.get('del_email_id')
user_model = UserModel()
email = UserEmailMap.query().get(email_id).email
user_data = c.user.get_api_data()
user_model.delete_extra_email(c.user.user_id, email_id)
audit_logger.store_web(
'user.edit.email.delete', action_data={'email': email, 'user': user_data},
user=self._rhodecode_user)
Session().commit()
h.flash(_("Removed email address from user account"),
category='success')
raise HTTPFound(h.route_path('edit_user_emails', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_ips', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def ips(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'ips'
c.user_ip_map = UserIpMap.query() \
.filter(UserIpMap.user == c.user).all()
c.inherit_default_ips = c.user.inherit_default_permissions
c.default_user_ip_map = UserIpMap.query() \
.filter(UserIpMap.user == User.get_default_user()).all()
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_ips_add', request_method='POST')
def ips_add(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
# NOTE(marcink): this view is allowed for default users, as we can
# edit their IP white list
user_model = UserModel()
desc = self.request.POST.get('description')
try:
ip_list = user_model.parse_ip_range(
self.request.POST.get('new_ip'))
except Exception as e:
ip_list = []
log.exception("Exception during ip saving")
h.flash(_('An error occurred during ip saving:%s' % (e,)),
category='error')
added = []
user_data = c.user.get_api_data()
for ip in ip_list:
try:
user_model.add_extra_ip(c.user.user_id, ip, desc)
audit_logger.store_web(
'user.edit.ip.add', action_data={'ip': ip, 'user': user_data},
user=self._rhodecode_user)
Session().commit()
added.append(ip)
except formencode.Invalid as error:
msg = error.error_dict['ip']
h.flash(msg, category='error')
except Exception:
log.exception("Exception during ip saving")
h.flash(_('An error occurred during ip saving'),
category='error')
if added:
h.flash(
_("Added ips %s to user whitelist") % (', '.join(ip_list), ),
category='success')
if 'default_user' in self.request.POST:
# case for editing global IP list we do it for 'DEFAULT' user
raise HTTPFound(h.route_path('admin_permissions_ips'))
raise HTTPFound(h.route_path('edit_user_ips', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_ips_delete', request_method='POST')
def ips_delete(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
# NOTE(marcink): this view is allowed for default users, as we can
# edit their IP white list
ip_id = self.request.POST.get('del_ip_id')
user_model = UserModel()
user_data = c.user.get_api_data()
ip = UserIpMap.query().get(ip_id).ip_addr
user_model.delete_extra_ip(c.user.user_id, ip_id)
audit_logger.store_web(
'user.edit.ip.delete', action_data={'ip': ip, 'user': user_data},
user=self._rhodecode_user)
Session().commit()
h.flash(_("Removed ip address from user whitelist"), category='success')
if 'default_user' in self.request.POST:
# case for editing global IP list we do it for 'DEFAULT' user
raise HTTPFound(h.route_path('admin_permissions_ips'))
raise HTTPFound(h.route_path('edit_user_ips', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_groups_management', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def groups_management(self):
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
c.data = c.user.group_member
self._redirect_for_default_user(c.user.username)
groups = [UserGroupModel.get_user_groups_as_dict(group.users_group)
for group in c.user.group_member]
c.groups = json.dumps(groups)
c.active = 'groups'
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@CSRFRequired()
@view_config(
route_name='edit_user_groups_management_updates', request_method='POST')
def groups_management_updates(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
user_groups = set(self.request.POST.getall('users_group_id'))
user_groups_objects = []
for ugid in user_groups:
user_groups_objects.append(
UserGroupModel().get_group(safe_int(ugid)))
user_group_model = UserGroupModel()
user_group_model.change_groups(c.user, user_groups_objects)
Session().commit()
c.active = 'user_groups_management'
h.flash(_("Groups successfully changed"), category='success')
return HTTPFound(h.route_path(
'edit_user_groups_management', user_id=user_id))
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_audit_logs', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def user_audit_logs(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'audit'
p = safe_int(self.request.GET.get('page', 1), 1)
filter_term = self.request.GET.get('filter')
user_log = UserModel().get_user_log(c.user, filter_term)
def url_generator(**kw):
if filter_term:
kw['filter'] = filter_term
return self.request.current_route_path(_query=kw)
c.audit_logs = h.Page(
user_log, page=p, items_per_page=10, url=url_generator)
c.filter_term = filter_term
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_perms_summary', request_method='GET',
renderer='rhodecode:templates/admin/users/user_edit.mako')
def user_perms_summary(self):
_ = self.request.translate
c = self.load_default_context()
user_id = self.request.matchdict.get('user_id')
c.user = User.get_or_404(user_id)
self._redirect_for_default_user(c.user.username)
c.active = 'perms_summary'
c.perm_user = c.user.AuthUser(ip_addr=self.request.remote_addr)
return self._get_template_context(c)
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
@view_config(
route_name='edit_user_perms_summary_json', request_method='GET',
renderer='json_ext')
def user_perms_summary_json(self):
self.load_default_context()
user_id = self.request.matchdict.get('user_id')
user = User.get_or_404(user_id)
self._redirect_for_default_user(user.username)
perm_user = user.AuthUser(ip_addr=self.request.remote_addr)
return perm_user.permissions