user_groups.py
471 lines
| 19.4 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2011-2016 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
User Groups crud controller for pylons | ||||
""" | ||||
import logging | ||||
import formencode | ||||
from formencode import htmlfill | ||||
from pylons import request, tmpl_context as c, url, config | ||||
from pylons.controllers.util import redirect | ||||
from pylons.i18n.translation import _ | ||||
from sqlalchemy.orm import joinedload | ||||
from rhodecode.lib import auth | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib.exceptions import UserGroupAssignedException,\ | ||||
RepoGroupAssignmentError | ||||
from rhodecode.lib.utils2 import safe_unicode, str2bool, safe_int | ||||
from rhodecode.lib.auth import ( | ||||
LoginRequired, NotAnonymous, HasUserGroupPermissionAnyDecorator, | ||||
HasPermissionAnyDecorator) | ||||
from rhodecode.lib.base import BaseController, render | ||||
from rhodecode.model.permission import PermissionModel | ||||
from rhodecode.model.scm import UserGroupList | ||||
from rhodecode.model.user_group import UserGroupModel | ||||
from rhodecode.model.db import ( | ||||
User, UserGroup, UserGroupRepoToPerm, UserGroupRepoGroupToPerm) | ||||
from rhodecode.model.forms import ( | ||||
UserGroupForm, UserGroupPermsForm, UserIndividualPermissionsForm, | ||||
UserPermissionsForm) | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.lib.utils import action_logger | ||||
from rhodecode.lib.ext_json import json | ||||
log = logging.getLogger(__name__) | ||||
class UserGroupsController(BaseController): | ||||
"""REST Controller styled on the Atom Publishing Protocol""" | ||||
@LoginRequired() | ||||
def __before__(self): | ||||
super(UserGroupsController, self).__before__() | ||||
c.available_permissions = config['available_permissions'] | ||||
PermissionModel().set_global_permission_choices(c, translator=_) | ||||
def __load_data(self, user_group_id): | ||||
c.group_members_obj = [x.user for x in c.user_group.members] | ||||
c.group_members_obj.sort(key=lambda u: u.username.lower()) | ||||
c.group_members = [(x.user_id, x.username) for x in c.group_members_obj] | ||||
c.available_members = [(x.user_id, x.username) | ||||
for x in User.query().all()] | ||||
c.available_members.sort(key=lambda u: u[1].lower()) | ||||
def __load_defaults(self, user_group_id): | ||||
""" | ||||
Load defaults settings for edit, and update | ||||
:param user_group_id: | ||||
""" | ||||
user_group = UserGroup.get_or_404(user_group_id) | ||||
data = user_group.get_dict() | ||||
# fill owner | ||||
if user_group.user: | ||||
data.update({'user': user_group.user.username}) | ||||
else: | ||||
replacement_user = User.get_first_admin().username | ||||
data.update({'user': replacement_user}) | ||||
return data | ||||
def _revoke_perms_on_yourself(self, form_result): | ||||
_updates = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_updates']) | ||||
_additions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_additions']) | ||||
_deletions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_deletions']) | ||||
admin_perm = 'usergroup.admin' | ||||
if _updates and _updates[0][1] != admin_perm or \ | ||||
_additions and _additions[0][1] != admin_perm or \ | ||||
_deletions and _deletions[0][1] != admin_perm: | ||||
return True | ||||
return False | ||||
# permission check inside | ||||
@NotAnonymous() | ||||
def index(self): | ||||
"""GET /users_groups: All items in the collection""" | ||||
# url('users_groups') | ||||
from rhodecode.lib.utils import PartialRenderer | ||||
_render = PartialRenderer('data_table/_dt_elements.html') | ||||
def user_group_name(user_group_id, user_group_name): | ||||
return _render("user_group_name", user_group_id, user_group_name) | ||||
def user_group_actions(user_group_id, user_group_name): | ||||
return _render("user_group_actions", user_group_id, user_group_name) | ||||
## json generate | ||||
group_iter = UserGroupList(UserGroup.query().all(), | ||||
perm_set=['usergroup.admin']) | ||||
user_groups_data = [] | ||||
for user_gr in group_iter: | ||||
user_groups_data.append({ | ||||
"group_name": user_group_name( | ||||
user_gr.users_group_id, h.escape(user_gr.users_group_name)), | ||||
"group_name_raw": user_gr.users_group_name, | ||||
"desc": h.escape(user_gr.user_group_description), | ||||
"members": len(user_gr.members), | ||||
"active": h.bool2icon(user_gr.users_group_active), | ||||
"owner": h.escape(h.link_to_user(user_gr.user.username)), | ||||
"action": user_group_actions( | ||||
user_gr.users_group_id, user_gr.users_group_name) | ||||
}) | ||||
c.data = json.dumps(user_groups_data) | ||||
return render('admin/user_groups/user_groups.html') | ||||
@HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') | ||||
@auth.CSRFRequired() | ||||
def create(self): | ||||
"""POST /users_groups: Create a new item""" | ||||
# url('users_groups') | ||||
users_group_form = UserGroupForm()() | ||||
try: | ||||
form_result = users_group_form.to_python(dict(request.POST)) | ||||
user_group = UserGroupModel().create( | ||||
name=form_result['users_group_name'], | ||||
description=form_result['user_group_description'], | ||||
owner=c.rhodecode_user.user_id, | ||||
active=form_result['users_group_active']) | ||||
Session().flush() | ||||
user_group_name = form_result['users_group_name'] | ||||
action_logger(c.rhodecode_user, | ||||
'admin_created_users_group:%s' % user_group_name, | ||||
None, self.ip_addr, self.sa) | ||||
user_group_link = h.link_to(h.escape(user_group_name), | ||||
url('edit_users_group', | ||||
user_group_id=user_group.users_group_id)) | ||||
h.flash(h.literal(_('Created user group %(user_group_link)s') | ||||
% {'user_group_link': user_group_link}), | ||||
category='success') | ||||
Session().commit() | ||||
except formencode.Invalid as errors: | ||||
return htmlfill.render( | ||||
render('admin/user_groups/user_group_add.html'), | ||||
defaults=errors.value, | ||||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception creating user group") | ||||
h.flash(_('Error occurred during creation of user group %s') \ | ||||
% request.POST.get('users_group_name'), category='error') | ||||
return redirect(url('users_groups')) | ||||
@HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') | ||||
def new(self): | ||||
"""GET /user_groups/new: Form to create a new item""" | ||||
# url('new_users_group') | ||||
return render('admin/user_groups/user_group_add.html') | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def update(self, user_group_id): | ||||
"""PUT /user_groups/user_group_id: Update an existing item""" | ||||
# Forms posted to this method should contain a hidden field: | ||||
# <input type="hidden" name="_method" value="PUT" /> | ||||
# Or using helpers: | ||||
# h.form(url('users_group', user_group_id=ID), | ||||
# method='put') | ||||
# url('users_group', user_group_id=ID) | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'settings' | ||||
self.__load_data(user_group_id) | ||||
available_members = [safe_unicode(x[0]) for x in c.available_members] | ||||
users_group_form = UserGroupForm(edit=True, | ||||
old_data=c.user_group.get_dict(), | ||||
available_members=available_members)() | ||||
try: | ||||
form_result = users_group_form.to_python(request.POST) | ||||
UserGroupModel().update(c.user_group, form_result) | ||||
gr = form_result['users_group_name'] | ||||
action_logger(c.rhodecode_user, | ||||
'admin_updated_users_group:%s' % gr, | ||||
None, self.ip_addr, self.sa) | ||||
h.flash(_('Updated user group %s') % gr, category='success') | ||||
Session().commit() | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
e = errors.error_dict or {} | ||||
return htmlfill.render( | ||||
render('admin/user_groups/user_group_edit.html'), | ||||
defaults=defaults, | ||||
errors=e, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception during update of user group") | ||||
h.flash(_('Error occurred during update of user group %s') | ||||
% request.POST.get('users_group_name'), category='error') | ||||
return redirect(url('edit_users_group', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def delete(self, user_group_id): | ||||
"""DELETE /user_groups/user_group_id: Delete an existing item""" | ||||
# Forms posted to this method should contain a hidden field: | ||||
# <input type="hidden" name="_method" value="DELETE" /> | ||||
# Or using helpers: | ||||
# h.form(url('users_group', user_group_id=ID), | ||||
# method='delete') | ||||
# url('users_group', user_group_id=ID) | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
force = str2bool(request.POST.get('force')) | ||||
try: | ||||
UserGroupModel().delete(c.user_group, force=force) | ||||
Session().commit() | ||||
h.flash(_('Successfully deleted user group'), category='success') | ||||
except UserGroupAssignedException as e: | ||||
h.flash(str(e), category='error') | ||||
except Exception: | ||||
log.exception("Exception during deletion of user group") | ||||
h.flash(_('An error occurred during deletion of user group'), | ||||
category='error') | ||||
return redirect(url('users_groups')) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit(self, user_group_id): | ||||
"""GET /user_groups/user_group_id/edit: Form to edit an existing item""" | ||||
# url('edit_users_group', user_group_id=ID) | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'settings' | ||||
self.__load_data(user_group_id) | ||||
defaults = self.__load_defaults(user_group_id) | ||||
return htmlfill.render( | ||||
render('admin/user_groups/user_group_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_perms(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'perms' | ||||
defaults = {} | ||||
# fill user group users | ||||
for p in c.user_group.user_user_group_to_perm: | ||||
defaults.update({'u_perm_%s' % p.user.user_id: | ||||
p.permission.permission_name}) | ||||
for p in c.user_group.user_group_user_group_to_perm: | ||||
defaults.update({'g_perm_%s' % p.user_group.users_group_id: | ||||
p.permission.permission_name}) | ||||
return htmlfill.render( | ||||
render('admin/user_groups/user_group_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def update_perms(self, user_group_id): | ||||
""" | ||||
grant permission for given usergroup | ||||
:param user_group_id: | ||||
""" | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
form = UserGroupPermsForm()().to_python(request.POST) | ||||
if not c.rhodecode_user.is_admin: | ||||
if self._revoke_perms_on_yourself(form): | ||||
msg = _('Cannot change permission for yourself as admin') | ||||
h.flash(msg, category='warning') | ||||
return redirect(url('edit_user_group_perms', user_group_id=user_group_id)) | ||||
try: | ||||
UserGroupModel().update_permissions(user_group_id, | ||||
form['perm_additions'], form['perm_updates'], form['perm_deletions']) | ||||
except RepoGroupAssignmentError: | ||||
h.flash(_('Target group cannot be the same'), category='error') | ||||
return redirect(url('edit_user_group_perms', user_group_id=user_group_id)) | ||||
#TODO: implement this | ||||
#action_logger(c.rhodecode_user, 'admin_changed_repo_permissions', | ||||
# repo_name, self.ip_addr, self.sa) | ||||
Session().commit() | ||||
h.flash(_('User Group permissions updated'), category='success') | ||||
return redirect(url('edit_user_group_perms', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_perms_summary(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'perms_summary' | ||||
permissions = { | ||||
'repositories': {}, | ||||
'repositories_groups': {}, | ||||
} | ||||
ugroup_repo_perms = UserGroupRepoToPerm.query()\ | ||||
.options(joinedload(UserGroupRepoToPerm.permission))\ | ||||
.options(joinedload(UserGroupRepoToPerm.repository))\ | ||||
.filter(UserGroupRepoToPerm.users_group_id == user_group_id)\ | ||||
.all() | ||||
for gr in ugroup_repo_perms: | ||||
permissions['repositories'][gr.repository.repo_name] \ | ||||
= gr.permission.permission_name | ||||
ugroup_group_perms = UserGroupRepoGroupToPerm.query()\ | ||||
.options(joinedload(UserGroupRepoGroupToPerm.permission))\ | ||||
.options(joinedload(UserGroupRepoGroupToPerm.group))\ | ||||
.filter(UserGroupRepoGroupToPerm.users_group_id == user_group_id)\ | ||||
.all() | ||||
for gr in ugroup_group_perms: | ||||
permissions['repositories_groups'][gr.group.group_name] \ | ||||
= gr.permission.permission_name | ||||
c.permissions = permissions | ||||
return render('admin/user_groups/user_group_edit.html') | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_global_perms(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'global_perms' | ||||
c.default_user = User.get_default_user() | ||||
defaults = c.user_group.get_dict() | ||||
defaults.update(c.default_user.get_default_perms(suffix='_inherited')) | ||||
defaults.update(c.user_group.get_default_perms()) | ||||
return htmlfill.render( | ||||
render('admin/user_groups/user_group_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
@auth.CSRFRequired() | ||||
def update_global_perms(self, user_group_id): | ||||
"""PUT /users_perm/user_group_id: Update an existing item""" | ||||
# url('users_group_perm', user_group_id=ID, method='put') | ||||
user_group_id = safe_int(user_group_id) | ||||
user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'global_perms' | ||||
try: | ||||
# first stage that verifies the checkbox | ||||
_form = UserIndividualPermissionsForm() | ||||
form_result = _form.to_python(dict(request.POST)) | ||||
inherit_perms = form_result['inherit_default_permissions'] | ||||
user_group.inherit_default_permissions = inherit_perms | ||||
Session().add(user_group) | ||||
if not inherit_perms: | ||||
# only update the individual ones if we un check the flag | ||||
_form = UserPermissionsForm( | ||||
[x[0] for x in c.repo_create_choices], | ||||
[x[0] for x in c.repo_create_on_write_choices], | ||||
[x[0] for x in c.repo_group_create_choices], | ||||
[x[0] for x in c.user_group_create_choices], | ||||
[x[0] for x in c.fork_choices], | ||||
[x[0] for x in c.inherit_default_permission_choices])() | ||||
form_result = _form.to_python(dict(request.POST)) | ||||
form_result.update({'perm_user_group_id': user_group.users_group_id}) | ||||
PermissionModel().update_user_group_permissions(form_result) | ||||
Session().commit() | ||||
h.flash(_('User Group global permissions updated successfully'), | ||||
category='success') | ||||
except formencode.Invalid as errors: | ||||
defaults = errors.value | ||||
c.user_group = user_group | ||||
return htmlfill.render( | ||||
render('admin/user_groups/user_group_edit.html'), | ||||
defaults=defaults, | ||||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception during permissions saving") | ||||
h.flash(_('An error occurred during permissions saving'), | ||||
category='error') | ||||
return redirect(url('edit_user_group_global_perms', user_group_id=user_group_id)) | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_advanced(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'advanced' | ||||
c.group_members_obj = sorted( | ||||
(x.user for x in c.user_group.members), | ||||
key=lambda u: u.username.lower()) | ||||
c.group_to_repos = sorted( | ||||
(x.repository for x in c.user_group.users_group_repo_to_perm), | ||||
key=lambda u: u.repo_name.lower()) | ||||
c.group_to_repo_groups = sorted( | ||||
(x.group for x in c.user_group.users_group_repo_group_to_perm), | ||||
key=lambda u: u.group_name.lower()) | ||||
return render('admin/user_groups/user_group_edit.html') | ||||
@HasUserGroupPermissionAnyDecorator('usergroup.admin') | ||||
def edit_members(self, user_group_id): | ||||
user_group_id = safe_int(user_group_id) | ||||
c.user_group = UserGroup.get_or_404(user_group_id) | ||||
c.active = 'members' | ||||
c.group_members_obj = sorted((x.user for x in c.user_group.members), | ||||
key=lambda u: u.username.lower()) | ||||
c.group_members = [(x.user_id, x.username) for x in c.group_members_obj] | ||||
return render('admin/user_groups/user_group_edit.html') | ||||