user_groups.py
514 lines
| 20.4 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r1271 | # Copyright (C) 2011-2017 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
User Groups crud controller for pylons | ||||
""" | ||||
import logging | ||||
import formencode | ||||
r1089 | import peppercorn | |||
r1 | from formencode import htmlfill | |||
from pylons import request, tmpl_context as c, url, config | ||||
from pylons.controllers.util import redirect | ||||
from pylons.i18n.translation import _ | ||||
from sqlalchemy.orm import joinedload | ||||
from rhodecode.lib import auth | ||||
from rhodecode.lib import helpers as h | ||||
r1805 | from rhodecode.lib import audit_logger | |||
r1796 | from rhodecode.lib.ext_json import json | |||
r1 | from rhodecode.lib.exceptions import UserGroupAssignedException,\ | |||
RepoGroupAssignmentError | ||||
r1805 | from rhodecode.lib.utils import jsonify | |||
r1 | from rhodecode.lib.utils2 import safe_unicode, str2bool, safe_int | |||
from rhodecode.lib.auth import ( | ||||
LoginRequired, NotAnonymous, HasUserGroupPermissionAnyDecorator, | ||||
r1089 | HasPermissionAnyDecorator, XHRRequired) | |||
r1 | from rhodecode.lib.base import BaseController, render | |||
from rhodecode.model.permission import PermissionModel | ||||
from rhodecode.model.scm import UserGroupList | ||||
from rhodecode.model.user_group import UserGroupModel | ||||
from rhodecode.model.db import ( | ||||
User, UserGroup, UserGroupRepoToPerm, UserGroupRepoGroupToPerm) | ||||
from rhodecode.model.forms import ( | ||||
UserGroupForm, UserGroupPermsForm, UserIndividualPermissionsForm, | ||||
UserPermissionsForm) | ||||
from rhodecode.model.meta import Session | ||||
r1796 | ||||
r1 | ||||
log = logging.getLogger(__name__) | ||||
class UserGroupsController(BaseController): | ||||
"""REST Controller styled on the Atom Publishing Protocol""" | ||||
@LoginRequired() | ||||
def __before__(self): | ||||
super(UserGroupsController, self).__before__() | ||||
c.available_permissions = config['available_permissions'] | ||||
r1098 | PermissionModel().set_global_permission_choices(c, gettext_translator=_) | |||
r1 | ||||
def __load_data(self, user_group_id): | ||||
c.group_members_obj = [x.user for x in c.user_group.members] | ||||
c.group_members_obj.sort(key=lambda u: u.username.lower()) | ||||
c.group_members = [(x.user_id, x.username) for x in c.group_members_obj] | ||||
def __load_defaults(self, user_group_id): | ||||
""" | ||||
Load defaults settings for edit, and update | ||||
:param user_group_id: | ||||
""" | ||||
user_group = UserGroup.get_or_404(user_group_id) | ||||
data = user_group.get_dict() | ||||
# fill owner | ||||
if user_group.user: | ||||
data.update({'user': user_group.user.username}) | ||||
else: | ||||
r278 | replacement_user = User.get_first_super_admin().username | |||
r1 | data.update({'user': replacement_user}) | |||
return data | ||||
def _revoke_perms_on_yourself(self, form_result): | ||||
_updates = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_updates']) | ||||
_additions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_additions']) | ||||
_deletions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_deletions']) | ||||
admin_perm = 'usergroup.admin' | ||||
if _updates and _updates[0][1] != admin_perm or \ | ||||
_additions and _additions[0][1] != admin_perm or \ | ||||
_deletions and _deletions[0][1] != admin_perm: | ||||
return True | ||||
return False | ||||
# permission check inside | ||||
@NotAnonymous() | ||||
def index(self): | ||||
r1897 | # TODO(marcink): remove bind to self.request after pyramid migration | |||
self.request = c.pyramid_request | ||||
_render = self.request.get_partial_renderer( | ||||
'data_table/_dt_elements.mako') | ||||
r1 | ||||
def user_group_name(user_group_id, user_group_name): | ||||
return _render("user_group_name", user_group_id, user_group_name) | ||||
def user_group_actions(user_group_id, user_group_name): | ||||
return _render("user_group_actions", user_group_id, user_group_name) | ||||
r1600 | # json generate | |||
r1 | group_iter = UserGroupList(UserGroup.query().all(), | |||
perm_set=['usergroup.admin']) | ||||
user_groups_data = [] | ||||
for user_gr in group_iter: | ||||
user_groups_data.append({ | ||||
"group_name": user_group_name( | ||||
user_gr.users_group_id, h.escape(user_gr.users_group_name)), | ||||
"group_name_raw": user_gr.users_group_name, | ||||
"desc": h.escape(user_gr.user_group_description), | ||||
"members": len(user_gr.members), | ||||
r1600 | "sync": user_gr.group_data.get('extern_type'), | |||
r1 | "active": h.bool2icon(user_gr.users_group_active), | |||
"owner": h.escape(h.link_to_user(user_gr.user.username)), | ||||
"action": user_group_actions( | ||||
user_gr.users_group_id, user_gr.users_group_name) | ||||
}) | ||||
c.data = json.dumps(user_groups_data) | ||||
r1282 | return render('admin/user_groups/user_groups.mako') | |||
r1 | ||||
@HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') | ||||
@auth.CSRFRequired() | ||||
def create(self): | ||||
users_group_form = UserGroupForm()() | ||||
try: | ||||
form_result = users_group_form.to_python(dict(request.POST)) | ||||
user_group = UserGroupModel().create( | ||||
name=form_result['users_group_name'], | ||||
description=form_result['user_group_description'], | ||||
owner=c.rhodecode_user.user_id, | ||||
active=form_result['users_group_active']) | ||||
Session().flush() | ||||
r1805 | creation_data = user_group.get_api_data() | |||
r1 | user_group_name = form_result['users_group_name'] | |||
r1805 | ||||
audit_logger.store_web( | ||||
'user_group.create', action_data={'data': creation_data}, | ||||
user=c.rhodecode_user) | ||||
user_group_link = h.link_to( | ||||
h.escape(user_group_name), | ||||
url('edit_users_group', user_group_id=user_group.users_group_id)) | ||||
r1 | h.flash(h.literal(_('Created user group %(user_group_link)s') | |||
% {'user_group_link': user_group_link}), | ||||
category='success') | ||||
Session().commit() | ||||
except formencode.Invalid as errors: | ||||
return htmlfill.render( | ||||
r1282 | render('admin/user_groups/user_group_add.mako'), | |||
r1 | defaults=errors.value, | |||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception creating user group") | ||||
h.flash(_('Error occurred during creation of user group %s') \ | ||||
% request.POST.get('users_group_name'), category='error') | ||||
r93 | return redirect( | |||
url('edit_users_group', user_group_id=user_group.users_group_id)) | ||||
r1 | ||||
@HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') | ||||
def new(self): | ||||
"""GET /user_groups/new: Form to create a new item""" | ||||
# url('new_users_group') | ||||
r1282 | return render('admin/user_groups/user_group_add.mako') | |||
r1 | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def update(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'settings' | ||||
self.__load_data(user_group_id) | ||||
r224 | users_group_form = UserGroupForm( | |||
r1089 | edit=True, old_data=c.user_group.get_dict(), allow_disabled=True)() | |||
r1 | ||||
r1805 | old_values = c.user_group.get_api_data() | |||
r1 | try: | |||
form_result = users_group_form.to_python(request.POST) | ||||
r1089 | pstruct = peppercorn.parse(request.POST.items()) | |||
form_result['users_group_members'] = pstruct['user_group_members'] | ||||
r1829 | user_group, added_members, removed_members = \ | |||
UserGroupModel().update(c.user_group, form_result) | ||||
r1089 | updated_user_group = form_result['users_group_name'] | |||
r1805 | ||||
audit_logger.store_web( | ||||
'user_group.edit', action_data={'old_data': old_values}, | ||||
user=c.rhodecode_user) | ||||
r1829 | # TODO(marcink): use added/removed to set user_group.edit.member.add | |||
r1089 | h.flash(_('Updated user group %s') % updated_user_group, | |||
category='success') | ||||
r1 | Session().commit() | |||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
e = errors.error_dict or {} | ||||
return htmlfill.render( | ||||
r1282 | render('admin/user_groups/user_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
errors=e, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception during update of user group") | ||||
h.flash(_('Error occurred during update of user group %s') | ||||
% request.POST.get('users_group_name'), category='error') | ||||
return redirect(url('edit_users_group', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def delete(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
force = str2bool(request.POST.get('force')) | ||||
r1805 | old_values = c.user_group.get_api_data() | |||
r1 | try: | |||
UserGroupModel().delete(c.user_group, force=force) | ||||
r1805 | audit_logger.store_web( | |||
'user.delete', action_data={'old_data': old_values}, | ||||
user=c.rhodecode_user) | ||||
r1 | Session().commit() | |||
h.flash(_('Successfully deleted user group'), category='success') | ||||
except UserGroupAssignedException as e: | ||||
h.flash(str(e), category='error') | ||||
except Exception: | ||||
log.exception("Exception during deletion of user group") | ||||
h.flash(_('An error occurred during deletion of user group'), | ||||
category='error') | ||||
return redirect(url('users_groups')) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit(self, user_group_id): | ||||
"""GET /user_groups/user_group_id/edit: Form to edit an existing item""" | ||||
# url('edit_users_group', user_group_id=ID) | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'settings' | ||||
self.__load_data(user_group_id) | ||||
defaults = self.__load_defaults(user_group_id) | ||||
return htmlfill.render( | ||||
r1282 | render('admin/user_groups/user_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_perms(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'perms' | ||||
defaults = {} | ||||
# fill user group users | ||||
for p in c.user_group.user_user_group_to_perm: | ||||
defaults.update({'u_perm_%s' % p.user.user_id: | ||||
p.permission.permission_name}) | ||||
for p in c.user_group.user_group_user_group_to_perm: | ||||
defaults.update({'g_perm_%s' % p.user_group.users_group_id: | ||||
p.permission.permission_name}) | ||||
return htmlfill.render( | ||||
r1282 | render('admin/user_groups/user_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def update_perms(self, user_group_id): | ||||
""" | ||||
grant permission for given usergroup | ||||
:param user_group_id: | ||||
""" | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
form = UserGroupPermsForm()().to_python(request.POST) | ||||
if not c.rhodecode_user.is_admin: | ||||
if self._revoke_perms_on_yourself(form): | ||||
msg = _('Cannot change permission for yourself as admin') | ||||
h.flash(msg, category='warning') | ||||
return redirect(url('edit_user_group_perms', user_group_id=user_group_id)) | ||||
try: | ||||
UserGroupModel().update_permissions(user_group_id, | ||||
form['perm_additions'], form['perm_updates'], form['perm_deletions']) | ||||
except RepoGroupAssignmentError: | ||||
h.flash(_('Target group cannot be the same'), category='error') | ||||
return redirect(url('edit_user_group_perms', user_group_id=user_group_id)) | ||||
r1805 | ||||
# TODO(marcink): implement global permissions | ||||
# audit_log.store_web('user_group.edit.permissions') | ||||
r1 | Session().commit() | |||
h.flash(_('User Group permissions updated'), category='success') | ||||
return redirect(url('edit_user_group_perms', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_perms_summary(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'perms_summary' | ||||
permissions = { | ||||
'repositories': {}, | ||||
'repositories_groups': {}, | ||||
} | ||||
ugroup_repo_perms = UserGroupRepoToPerm.query()\ | ||||
.options(joinedload(UserGroupRepoToPerm.permission))\ | ||||
.options(joinedload(UserGroupRepoToPerm.repository))\ | ||||
.filter(UserGroupRepoToPerm.users_group_id == user_group_id)\ | ||||
.all() | ||||
for gr in ugroup_repo_perms: | ||||
permissions['repositories'][gr.repository.repo_name] \ | ||||
= gr.permission.permission_name | ||||
ugroup_group_perms = UserGroupRepoGroupToPerm.query()\ | ||||
.options(joinedload(UserGroupRepoGroupToPerm.permission))\ | ||||
.options(joinedload(UserGroupRepoGroupToPerm.group))\ | ||||
.filter(UserGroupRepoGroupToPerm.users_group_id == user_group_id)\ | ||||
.all() | ||||
for gr in ugroup_group_perms: | ||||
permissions['repositories_groups'][gr.group.group_name] \ | ||||
= gr.permission.permission_name | ||||
c.permissions = permissions | ||||
r1282 | return render('admin/user_groups/user_group_edit.mako') | |||
r1 | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_global_perms(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'global_perms' | ||||
c.default_user = User.get_default_user() | ||||
defaults = c.user_group.get_dict() | ||||
defaults.update(c.default_user.get_default_perms(suffix='_inherited')) | ||||
defaults.update(c.user_group.get_default_perms()) | ||||
return htmlfill.render( | ||||
r1282 | render('admin/user_groups/user_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def update_global_perms(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'global_perms' | ||||
try: | ||||
# first stage that verifies the checkbox | ||||
_form = UserIndividualPermissionsForm() | ||||
form_result = _form.to_python(dict(request.POST)) | ||||
inherit_perms = form_result['inherit_default_permissions'] | ||||
user_group.inherit_default_permissions = inherit_perms | ||||
Session().add(user_group) | ||||
if not inherit_perms: | ||||
# only update the individual ones if we un check the flag | ||||
_form = UserPermissionsForm( | ||||
[x[0] for x in c.repo_create_choices], | ||||
[x[0] for x in c.repo_create_on_write_choices], | ||||
[x[0] for x in c.repo_group_create_choices], | ||||
[x[0] for x in c.user_group_create_choices], | ||||
[x[0] for x in c.fork_choices], | ||||
[x[0] for x in c.inherit_default_permission_choices])() | ||||
form_result = _form.to_python(dict(request.POST)) | ||||
form_result.update({'perm_user_group_id': user_group.users_group_id}) | ||||
PermissionModel().update_user_group_permissions(form_result) | ||||
Session().commit() | ||||
h.flash(_('User Group global permissions updated successfully'), | ||||
category='success') | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
c.user_group = user_group | ||||
return htmlfill.render( | ||||
r1282 | render('admin/user_groups/user_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception during permissions saving") | ||||
h.flash(_('An error occurred during permissions saving'), | ||||
category='error') | ||||
return redirect(url('edit_user_group_global_perms', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_advanced(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'advanced' | ||||
c.group_members_obj = sorted( | ||||
(x.user for x in c.user_group.members), | ||||
key=lambda u: u.username.lower()) | ||||
c.group_to_repos = sorted( | ||||
(x.repository for x in c.user_group.users_group_repo_to_perm), | ||||
key=lambda u: u.repo_name.lower()) | ||||
c.group_to_repo_groups = sorted( | ||||
(x.group for x in c.user_group.users_group_repo_group_to_perm), | ||||
key=lambda u: u.group_name.lower()) | ||||
r1282 | return render('admin/user_groups/user_group_edit.mako') | |||
r1 | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
r1600 | def edit_advanced_set_synchronization(self, user_group_id): | |||
user_group_id = safe_int(user_group_id) | ||||
user_group = UserGroup.get_or_404(user_group_id) | ||||
existing = user_group.group_data.get('extern_type') | ||||
if existing: | ||||
new_state = user_group.group_data | ||||
new_state['extern_type'] = None | ||||
else: | ||||
new_state = user_group.group_data | ||||
new_state['extern_type'] = 'manual' | ||||
new_state['extern_type_set_by'] = c.rhodecode_user.username | ||||
try: | ||||
user_group.group_data = new_state | ||||
Session().add(user_group) | ||||
Session().commit() | ||||
h.flash(_('User Group synchronization updated successfully'), | ||||
category='success') | ||||
except Exception: | ||||
log.exception("Exception during sync settings saving") | ||||
h.flash(_('An error occurred during synchronization update'), | ||||
category='error') | ||||
return redirect( | ||||
url('edit_user_group_advanced', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
r1089 | @XHRRequired() | |||
@jsonify | ||||
def user_group_members(self, user_group_id): | ||||
r1805 | """ | |||
Return members of given user group | ||||
""" | ||||
r1 | user_group_id = safe_int(user_group_id) | |||
r1089 | user_group = UserGroup.get_or_404(user_group_id) | |||
group_members_obj = sorted((x.user for x in user_group.members), | ||||
key=lambda u: u.username.lower()) | ||||
r1 | ||||
r1089 | group_members = [ | |||
{ | ||||
'id': user.user_id, | ||||
r1815 | 'first_name': user.first_name, | |||
'last_name': user.last_name, | ||||
r1089 | 'username': user.username, | |||
'icon_link': h.gravatar_url(user.email, 30), | ||||
'value_display': h.person(user.email), | ||||
'value': user.username, | ||||
'value_type': 'user', | ||||
'active': user.active, | ||||
} | ||||
for user in group_members_obj | ||||
] | ||||
r93 | ||||
r1089 | return { | |||
'members': group_members | ||||
} | ||||