|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright (C) 2016-2020 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import peppercorn
|
|
|
import formencode
|
|
|
import formencode.htmlfill
|
|
|
from pyramid.httpexceptions import HTTPFound
|
|
|
|
|
|
from pyramid.response import Response
|
|
|
from pyramid.renderers import render
|
|
|
|
|
|
from rhodecode import events
|
|
|
from rhodecode.lib.exceptions import (
|
|
|
RepoGroupAssignmentError, UserGroupAssignedException)
|
|
|
from rhodecode.model.forms import (
|
|
|
UserGroupPermsForm, UserGroupForm, UserIndividualPermissionsForm,
|
|
|
UserPermissionsForm)
|
|
|
from rhodecode.model.permission import PermissionModel
|
|
|
|
|
|
from rhodecode.apps._base import UserGroupAppView
|
|
|
from rhodecode.lib.auth import (
|
|
|
LoginRequired, HasUserGroupPermissionAnyDecorator, CSRFRequired)
|
|
|
from rhodecode.lib import helpers as h, audit_logger
|
|
|
from rhodecode.lib.utils2 import str2bool, safe_int
|
|
|
from rhodecode.model.db import User, UserGroup
|
|
|
from rhodecode.model.meta import Session
|
|
|
from rhodecode.model.user_group import UserGroupModel
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class UserGroupsView(UserGroupAppView):
|
|
|
|
|
|
def load_default_context(self):
|
|
|
c = self._get_local_tmpl_context()
|
|
|
|
|
|
PermissionModel().set_global_permission_choices(
|
|
|
c, gettext_translator=self.request.translate)
|
|
|
|
|
|
return c
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_members(self):
|
|
|
"""
|
|
|
Return members of given user group
|
|
|
"""
|
|
|
self.load_default_context()
|
|
|
user_group = self.db_user_group
|
|
|
group_members_obj = sorted((x.user for x in user_group.members),
|
|
|
key=lambda u: u.username.lower())
|
|
|
|
|
|
group_members = [
|
|
|
{
|
|
|
'id': user.user_id,
|
|
|
'first_name': user.first_name,
|
|
|
'last_name': user.last_name,
|
|
|
'username': user.username,
|
|
|
'icon_link': h.gravatar_url(user.email, 30),
|
|
|
'value_display': h.person(user.email),
|
|
|
'value': user.username,
|
|
|
'value_type': 'user',
|
|
|
'active': user.active,
|
|
|
}
|
|
|
for user in group_members_obj
|
|
|
]
|
|
|
|
|
|
return {
|
|
|
'members': group_members
|
|
|
}
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_perms_summary(self):
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = self.db_user_group
|
|
|
c.active = 'perms_summary'
|
|
|
c.permissions = UserGroupModel().get_perms_summary(
|
|
|
c.user_group.users_group_id)
|
|
|
return self._get_template_context(c)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_perms_summary_json(self):
|
|
|
self.load_default_context()
|
|
|
user_group = self.db_user_group
|
|
|
return UserGroupModel().get_perms_summary(user_group.users_group_id)
|
|
|
|
|
|
def _revoke_perms_on_yourself(self, form_result):
|
|
|
_updates = filter(lambda u: self._rhodecode_user.user_id == int(u[0]),
|
|
|
form_result['perm_updates'])
|
|
|
_additions = filter(lambda u: self._rhodecode_user.user_id == int(u[0]),
|
|
|
form_result['perm_additions'])
|
|
|
_deletions = filter(lambda u: self._rhodecode_user.user_id == int(u[0]),
|
|
|
form_result['perm_deletions'])
|
|
|
admin_perm = 'usergroup.admin'
|
|
|
if _updates and _updates[0][1] != admin_perm or \
|
|
|
_additions and _additions[0][1] != admin_perm or \
|
|
|
_deletions and _deletions[0][1] != admin_perm:
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
@CSRFRequired()
|
|
|
def user_group_update(self):
|
|
|
_ = self.request.translate
|
|
|
|
|
|
user_group = self.db_user_group
|
|
|
user_group_id = user_group.users_group_id
|
|
|
|
|
|
old_user_group_name = self.db_user_group_name
|
|
|
new_user_group_name = old_user_group_name
|
|
|
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
c.group_members_obj = [x.user for x in c.user_group.members]
|
|
|
c.group_members_obj.sort(key=lambda u: u.username.lower())
|
|
|
c.group_members = [(x.user_id, x.username) for x in c.group_members_obj]
|
|
|
c.active = 'settings'
|
|
|
|
|
|
users_group_form = UserGroupForm(
|
|
|
self.request.translate, edit=True,
|
|
|
old_data=c.user_group.get_dict(), allow_disabled=True)()
|
|
|
|
|
|
old_values = c.user_group.get_api_data()
|
|
|
|
|
|
try:
|
|
|
form_result = users_group_form.to_python(self.request.POST)
|
|
|
pstruct = peppercorn.parse(self.request.POST.items())
|
|
|
form_result['users_group_members'] = pstruct['user_group_members']
|
|
|
|
|
|
user_group, added_members, removed_members = \
|
|
|
UserGroupModel().update(c.user_group, form_result)
|
|
|
new_user_group_name = form_result['users_group_name']
|
|
|
|
|
|
for user_id in added_members:
|
|
|
user = User.get(user_id)
|
|
|
user_data = user.get_api_data()
|
|
|
audit_logger.store_web(
|
|
|
'user_group.edit.member.add',
|
|
|
action_data={'user': user_data, 'old_data': old_values},
|
|
|
user=self._rhodecode_user)
|
|
|
|
|
|
for user_id in removed_members:
|
|
|
user = User.get(user_id)
|
|
|
user_data = user.get_api_data()
|
|
|
audit_logger.store_web(
|
|
|
'user_group.edit.member.delete',
|
|
|
action_data={'user': user_data, 'old_data': old_values},
|
|
|
user=self._rhodecode_user)
|
|
|
|
|
|
audit_logger.store_web(
|
|
|
'user_group.edit', action_data={'old_data': old_values},
|
|
|
user=self._rhodecode_user)
|
|
|
|
|
|
h.flash(_('Updated user group %s') % new_user_group_name,
|
|
|
category='success')
|
|
|
|
|
|
affected_user_ids = []
|
|
|
for user_id in added_members + removed_members:
|
|
|
affected_user_ids.append(user_id)
|
|
|
|
|
|
name_changed = old_user_group_name != new_user_group_name
|
|
|
if name_changed:
|
|
|
owner = User.get_by_username(form_result['user'])
|
|
|
owner_id = owner.user_id if owner else self._rhodecode_user.user_id
|
|
|
affected_user_ids.append(self._rhodecode_user.user_id)
|
|
|
affected_user_ids.append(owner_id)
|
|
|
|
|
|
PermissionModel().trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
Session().commit()
|
|
|
except formencode.Invalid as errors:
|
|
|
defaults = errors.value
|
|
|
e = errors.error_dict or {}
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/user_groups/user_group_edit.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
errors=e,
|
|
|
prefix_error=False,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
except Exception:
|
|
|
log.exception("Exception during update of user group")
|
|
|
h.flash(_('Error occurred during update of user group %s')
|
|
|
% new_user_group_name, category='error')
|
|
|
|
|
|
raise HTTPFound(
|
|
|
h.route_path('edit_user_group', user_group_id=user_group_id))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
@CSRFRequired()
|
|
|
def user_group_delete(self):
|
|
|
_ = self.request.translate
|
|
|
user_group = self.db_user_group
|
|
|
|
|
|
self.load_default_context()
|
|
|
force = str2bool(self.request.POST.get('force'))
|
|
|
|
|
|
old_values = user_group.get_api_data()
|
|
|
try:
|
|
|
UserGroupModel().delete(user_group, force=force)
|
|
|
audit_logger.store_web(
|
|
|
'user.delete', action_data={'old_data': old_values},
|
|
|
user=self._rhodecode_user)
|
|
|
Session().commit()
|
|
|
h.flash(_('Successfully deleted user group'), category='success')
|
|
|
except UserGroupAssignedException as e:
|
|
|
h.flash(str(e), category='error')
|
|
|
except Exception:
|
|
|
log.exception("Exception during deletion of user group")
|
|
|
h.flash(_('An error occurred during deletion of user group'),
|
|
|
category='error')
|
|
|
raise HTTPFound(h.route_path('user_groups'))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_edit(self):
|
|
|
user_group = self.db_user_group
|
|
|
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
c.group_members_obj = [x.user for x in c.user_group.members]
|
|
|
c.group_members_obj.sort(key=lambda u: u.username.lower())
|
|
|
c.group_members = [(x.user_id, x.username) for x in c.group_members_obj]
|
|
|
|
|
|
c.active = 'settings'
|
|
|
|
|
|
defaults = user_group.get_dict()
|
|
|
# fill owner
|
|
|
if user_group.user:
|
|
|
defaults.update({'user': user_group.user.username})
|
|
|
else:
|
|
|
replacement_user = User.get_first_super_admin().username
|
|
|
defaults.update({'user': replacement_user})
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/user_groups/user_group_edit.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_edit_perms(self):
|
|
|
user_group = self.db_user_group
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
c.active = 'perms'
|
|
|
|
|
|
defaults = {}
|
|
|
# fill user group users
|
|
|
for p in c.user_group.user_user_group_to_perm:
|
|
|
defaults.update({'u_perm_%s' % p.user.user_id:
|
|
|
p.permission.permission_name})
|
|
|
|
|
|
for p in c.user_group.user_group_user_group_to_perm:
|
|
|
defaults.update({'g_perm_%s' % p.user_group.users_group_id:
|
|
|
p.permission.permission_name})
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/user_groups/user_group_edit.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
@CSRFRequired()
|
|
|
def user_group_update_perms(self):
|
|
|
"""
|
|
|
grant permission for given user group
|
|
|
"""
|
|
|
_ = self.request.translate
|
|
|
|
|
|
user_group = self.db_user_group
|
|
|
user_group_id = user_group.users_group_id
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
form = UserGroupPermsForm(self.request.translate)().to_python(self.request.POST)
|
|
|
|
|
|
if not self._rhodecode_user.is_admin:
|
|
|
if self._revoke_perms_on_yourself(form):
|
|
|
msg = _('Cannot change permission for yourself as admin')
|
|
|
h.flash(msg, category='warning')
|
|
|
raise HTTPFound(
|
|
|
h.route_path('edit_user_group_perms',
|
|
|
user_group_id=user_group_id))
|
|
|
|
|
|
try:
|
|
|
changes = UserGroupModel().update_permissions(
|
|
|
user_group,
|
|
|
form['perm_additions'], form['perm_updates'],
|
|
|
form['perm_deletions'])
|
|
|
|
|
|
except RepoGroupAssignmentError:
|
|
|
h.flash(_('Target group cannot be the same'), category='error')
|
|
|
raise HTTPFound(
|
|
|
h.route_path('edit_user_group_perms',
|
|
|
user_group_id=user_group_id))
|
|
|
|
|
|
action_data = {
|
|
|
'added': changes['added'],
|
|
|
'updated': changes['updated'],
|
|
|
'deleted': changes['deleted'],
|
|
|
}
|
|
|
audit_logger.store_web(
|
|
|
'user_group.edit.permissions', action_data=action_data,
|
|
|
user=self._rhodecode_user)
|
|
|
|
|
|
Session().commit()
|
|
|
h.flash(_('User Group permissions updated'), category='success')
|
|
|
|
|
|
affected_user_ids = []
|
|
|
for change in changes['added'] + changes['updated'] + changes['deleted']:
|
|
|
if change['type'] == 'user':
|
|
|
affected_user_ids.append(change['id'])
|
|
|
if change['type'] == 'user_group':
|
|
|
user_group = UserGroup.get(safe_int(change['id']))
|
|
|
if user_group:
|
|
|
group_members_ids = [x.user_id for x in user_group.members]
|
|
|
affected_user_ids.extend(group_members_ids)
|
|
|
|
|
|
PermissionModel().trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
raise HTTPFound(
|
|
|
h.route_path('edit_user_group_perms', user_group_id=user_group_id))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_global_perms_edit(self):
|
|
|
user_group = self.db_user_group
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
c.active = 'global_perms'
|
|
|
|
|
|
c.default_user = User.get_default_user()
|
|
|
defaults = c.user_group.get_dict()
|
|
|
defaults.update(c.default_user.get_default_perms(suffix='_inherited'))
|
|
|
defaults.update(c.user_group.get_default_perms())
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/user_groups/user_group_edit.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
@CSRFRequired()
|
|
|
def user_group_global_perms_update(self):
|
|
|
_ = self.request.translate
|
|
|
user_group = self.db_user_group
|
|
|
user_group_id = self.db_user_group.users_group_id
|
|
|
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
c.active = 'global_perms'
|
|
|
|
|
|
try:
|
|
|
# first stage that verifies the checkbox
|
|
|
_form = UserIndividualPermissionsForm(self.request.translate)
|
|
|
form_result = _form.to_python(dict(self.request.POST))
|
|
|
inherit_perms = form_result['inherit_default_permissions']
|
|
|
user_group.inherit_default_permissions = inherit_perms
|
|
|
Session().add(user_group)
|
|
|
|
|
|
if not inherit_perms:
|
|
|
# only update the individual ones if we un check the flag
|
|
|
_form = UserPermissionsForm(
|
|
|
self.request.translate,
|
|
|
[x[0] for x in c.repo_create_choices],
|
|
|
[x[0] for x in c.repo_create_on_write_choices],
|
|
|
[x[0] for x in c.repo_group_create_choices],
|
|
|
[x[0] for x in c.user_group_create_choices],
|
|
|
[x[0] for x in c.fork_choices],
|
|
|
[x[0] for x in c.inherit_default_permission_choices])()
|
|
|
|
|
|
form_result = _form.to_python(dict(self.request.POST))
|
|
|
form_result.update(
|
|
|
{'perm_user_group_id': user_group.users_group_id})
|
|
|
|
|
|
PermissionModel().update_user_group_permissions(form_result)
|
|
|
|
|
|
Session().commit()
|
|
|
h.flash(_('User Group global permissions updated successfully'),
|
|
|
category='success')
|
|
|
|
|
|
except formencode.Invalid as errors:
|
|
|
defaults = errors.value
|
|
|
|
|
|
data = render(
|
|
|
'rhodecode:templates/admin/user_groups/user_group_edit.mako',
|
|
|
self._get_template_context(c), self.request)
|
|
|
html = formencode.htmlfill.render(
|
|
|
data,
|
|
|
defaults=defaults,
|
|
|
errors=errors.error_dict or {},
|
|
|
prefix_error=False,
|
|
|
encoding="UTF-8",
|
|
|
force_defaults=False
|
|
|
)
|
|
|
return Response(html)
|
|
|
except Exception:
|
|
|
log.exception("Exception during permissions saving")
|
|
|
h.flash(_('An error occurred during permissions saving'),
|
|
|
category='error')
|
|
|
|
|
|
raise HTTPFound(
|
|
|
h.route_path('edit_user_group_global_perms',
|
|
|
user_group_id=user_group_id))
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
def user_group_edit_advanced(self):
|
|
|
user_group = self.db_user_group
|
|
|
|
|
|
c = self.load_default_context()
|
|
|
c.user_group = user_group
|
|
|
c.active = 'advanced'
|
|
|
c.group_members_obj = sorted(
|
|
|
(x.user for x in c.user_group.members),
|
|
|
key=lambda u: u.username.lower())
|
|
|
|
|
|
c.group_to_repos = sorted(
|
|
|
(x.repository for x in c.user_group.users_group_repo_to_perm),
|
|
|
key=lambda u: u.repo_name.lower())
|
|
|
|
|
|
c.group_to_repo_groups = sorted(
|
|
|
(x.group for x in c.user_group.users_group_repo_group_to_perm),
|
|
|
key=lambda u: u.group_name.lower())
|
|
|
|
|
|
c.group_to_review_rules = sorted(
|
|
|
(x.users_group for x in c.user_group.user_group_review_rules),
|
|
|
key=lambda u: u.users_group_name.lower())
|
|
|
|
|
|
return self._get_template_context(c)
|
|
|
|
|
|
@LoginRequired()
|
|
|
@HasUserGroupPermissionAnyDecorator('usergroup.admin')
|
|
|
@CSRFRequired()
|
|
|
def user_group_edit_advanced_set_synchronization(self):
|
|
|
_ = self.request.translate
|
|
|
user_group = self.db_user_group
|
|
|
user_group_id = user_group.users_group_id
|
|
|
|
|
|
existing = user_group.group_data.get('extern_type')
|
|
|
|
|
|
if existing:
|
|
|
new_state = user_group.group_data
|
|
|
new_state['extern_type'] = None
|
|
|
else:
|
|
|
new_state = user_group.group_data
|
|
|
new_state['extern_type'] = 'manual'
|
|
|
new_state['extern_type_set_by'] = self._rhodecode_user.username
|
|
|
|
|
|
try:
|
|
|
user_group.group_data = new_state
|
|
|
Session().add(user_group)
|
|
|
Session().commit()
|
|
|
|
|
|
h.flash(_('User Group synchronization updated successfully'),
|
|
|
category='success')
|
|
|
except Exception:
|
|
|
log.exception("Exception during sync settings saving")
|
|
|
h.flash(_('An error occurred during synchronization update'),
|
|
|
category='error')
|
|
|
|
|
|
raise HTTPFound(
|
|
|
h.route_path('edit_user_group_advanced',
|
|
|
user_group_id=user_group_id))
|
|
|
|