repo_groups.py
406 lines
| 16.2 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r1271 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Repository groups controller for RhodeCode | ||||
""" | ||||
import logging | ||||
import formencode | ||||
from formencode import htmlfill | ||||
from pylons import request, tmpl_context as c, url | ||||
from pylons.controllers.util import abort, redirect | ||||
from pylons.i18n.translation import _, ungettext | ||||
from rhodecode.lib import auth | ||||
from rhodecode.lib import helpers as h | ||||
r1799 | from rhodecode.lib import audit_logger | |||
r1 | from rhodecode.lib.ext_json import json | |||
from rhodecode.lib.auth import ( | ||||
LoginRequired, NotAnonymous, HasPermissionAll, | ||||
HasRepoGroupPermissionAll, HasRepoGroupPermissionAnyDecorator) | ||||
from rhodecode.lib.base import BaseController, render | ||||
r1796 | from rhodecode.lib.utils2 import safe_int | |||
r1 | from rhodecode.model.db import RepoGroup, User | |||
from rhodecode.model.scm import RepoGroupList | ||||
from rhodecode.model.repo_group import RepoGroupModel | ||||
from rhodecode.model.forms import RepoGroupForm, RepoGroupPermsForm | ||||
from rhodecode.model.meta import Session | ||||
log = logging.getLogger(__name__) | ||||
class RepoGroupsController(BaseController): | ||||
"""REST Controller styled on the Atom Publishing Protocol""" | ||||
@LoginRequired() | ||||
def __before__(self): | ||||
super(RepoGroupsController, self).__before__() | ||||
def __load_defaults(self, allow_empty_group=False, repo_group=None): | ||||
if self._can_create_repo_group(): | ||||
# we're global admin, we're ok and we can create TOP level groups | ||||
allow_empty_group = True | ||||
# override the choices for this form, we need to filter choices | ||||
# and display only those we have ADMIN right | ||||
groups_with_admin_rights = RepoGroupList( | ||||
RepoGroup.query().all(), | ||||
perm_set=['group.admin']) | ||||
c.repo_groups = RepoGroup.groups_choices( | ||||
groups=groups_with_admin_rights, | ||||
show_empty_group=allow_empty_group) | ||||
if repo_group: | ||||
# exclude filtered ids | ||||
exclude_group_ids = [repo_group.group_id] | ||||
c.repo_groups = filter(lambda x: x[0] not in exclude_group_ids, | ||||
c.repo_groups) | ||||
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups) | ||||
parent_group = repo_group.parent_group | ||||
add_parent_group = (parent_group and ( | ||||
unicode(parent_group.group_id) not in c.repo_groups_choices)) | ||||
if add_parent_group: | ||||
c.repo_groups_choices.append(unicode(parent_group.group_id)) | ||||
c.repo_groups.append(RepoGroup._generate_choice(parent_group)) | ||||
def __load_data(self, group_id): | ||||
""" | ||||
Load defaults settings for edit, and update | ||||
:param group_id: | ||||
""" | ||||
repo_group = RepoGroup.get_or_404(group_id) | ||||
data = repo_group.get_dict() | ||||
data['group_name'] = repo_group.name | ||||
# fill owner | ||||
if repo_group.user: | ||||
data.update({'user': repo_group.user.username}) | ||||
else: | ||||
r278 | replacement_user = User.get_first_super_admin().username | |||
r1 | data.update({'user': replacement_user}) | |||
# fill repository group users | ||||
for p in repo_group.repo_group_to_perm: | ||||
data.update({ | ||||
'u_perm_%s' % p.user.user_id: p.permission.permission_name}) | ||||
# fill repository group user groups | ||||
for p in repo_group.users_group_to_perm: | ||||
data.update({ | ||||
'g_perm_%s' % p.users_group.users_group_id: | ||||
p.permission.permission_name}) | ||||
# html and form expects -1 as empty parent group | ||||
data['group_parent_id'] = data['group_parent_id'] or -1 | ||||
return data | ||||
def _revoke_perms_on_yourself(self, form_result): | ||||
_updates = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_updates']) | ||||
_additions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_additions']) | ||||
_deletions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]), | ||||
form_result['perm_deletions']) | ||||
admin_perm = 'group.admin' | ||||
if _updates and _updates[0][1] != admin_perm or \ | ||||
_additions and _additions[0][1] != admin_perm or \ | ||||
_deletions and _deletions[0][1] != admin_perm: | ||||
return True | ||||
return False | ||||
def _can_create_repo_group(self, parent_group_id=None): | ||||
is_admin = HasPermissionAll('hg.admin')('group create controller') | ||||
create_repo_group = HasPermissionAll( | ||||
'hg.repogroup.create.true')('group create controller') | ||||
if is_admin or (create_repo_group and not parent_group_id): | ||||
# we're global admin, or we have global repo group create | ||||
# permission | ||||
# we're ok and we can create TOP level groups | ||||
return True | ||||
elif parent_group_id: | ||||
# we check the permission if we can write to parent group | ||||
group = RepoGroup.get(parent_group_id) | ||||
group_name = group.group_name if group else None | ||||
if HasRepoGroupPermissionAll('group.admin')( | ||||
group_name, 'check if user is an admin of group'): | ||||
# we're an admin of passed in group, we're ok. | ||||
return True | ||||
else: | ||||
return False | ||||
return False | ||||
@NotAnonymous() | ||||
def index(self): | ||||
repo_group_list = RepoGroup.get_all_repo_groups() | ||||
_perms = ['group.admin'] | ||||
repo_group_list_acl = RepoGroupList(repo_group_list, perm_set=_perms) | ||||
repo_group_data = RepoGroupModel().get_repo_groups_as_dict( | ||||
repo_group_list=repo_group_list_acl, admin=True) | ||||
c.data = json.dumps(repo_group_data) | ||||
r1282 | return render('admin/repo_groups/repo_groups.mako') | |||
r1 | ||||
# perm checks inside | ||||
@NotAnonymous() | ||||
@auth.CSRFRequired() | ||||
def create(self): | ||||
parent_group_id = safe_int(request.POST.get('group_parent_id')) | ||||
can_create = self._can_create_repo_group(parent_group_id) | ||||
self.__load_defaults() | ||||
# permissions for can create group based on parent_id are checked | ||||
# here in the Form | ||||
available_groups = map(lambda k: unicode(k[0]), c.repo_groups) | ||||
repo_group_form = RepoGroupForm(available_groups=available_groups, | ||||
can_create_in_root=can_create)() | ||||
try: | ||||
owner = c.rhodecode_user | ||||
form_result = repo_group_form.to_python(dict(request.POST)) | ||||
r1799 | repo_group = RepoGroupModel().create( | |||
r1 | group_name=form_result['group_name_full'], | |||
group_description=form_result['group_description'], | ||||
owner=owner.user_id, | ||||
copy_permissions=form_result['group_copy_permissions'] | ||||
) | ||||
Session().commit() | ||||
r1799 | repo_group_data = repo_group.get_api_data() | |||
r1 | _new_group_name = form_result['group_name_full'] | |||
r1799 | ||||
audit_logger.store( | ||||
action='repo_group.create', | ||||
action_data={'repo_group_data': repo_group_data}, | ||||
user=c.rhodecode_user, commit=True) | ||||
r1 | repo_group_url = h.link_to( | |||
_new_group_name, | ||||
r1774 | h.route_path('repo_group_home', repo_group_name=_new_group_name)) | |||
r1 | h.flash(h.literal(_('Created repository group %s') | |||
% repo_group_url), category='success') | ||||
r1799 | ||||
r1 | except formencode.Invalid as errors: | |||
return htmlfill.render( | ||||
r1282 | render('admin/repo_groups/repo_group_add.mako'), | |||
r1 | defaults=errors.value, | |||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception during creation of repository group") | ||||
h.flash(_('Error occurred during creation of repository group %s') | ||||
% request.POST.get('group_name'), category='error') | ||||
# TODO: maybe we should get back to the main view, not the admin one | ||||
return redirect(url('repo_groups', parent_group=parent_group_id)) | ||||
# perm checks inside | ||||
@NotAnonymous() | ||||
def new(self): | ||||
# perm check for admin, create_group perm or admin of parent_group | ||||
parent_group_id = safe_int(request.GET.get('parent_group')) | ||||
if not self._can_create_repo_group(parent_group_id): | ||||
return abort(403) | ||||
self.__load_defaults() | ||||
r1282 | return render('admin/repo_groups/repo_group_add.mako') | |||
r1 | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
@auth.CSRFRequired() | ||||
def update(self, group_name): | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
can_create_in_root = self._can_create_repo_group() | ||||
show_root_location = can_create_in_root | ||||
if not c.repo_group.parent_group: | ||||
# this group don't have a parrent so we should show empty value | ||||
show_root_location = True | ||||
self.__load_defaults(allow_empty_group=show_root_location, | ||||
repo_group=c.repo_group) | ||||
repo_group_form = RepoGroupForm( | ||||
r224 | edit=True, old_data=c.repo_group.get_dict(), | |||
r1 | available_groups=c.repo_groups_choices, | |||
r224 | can_create_in_root=can_create_in_root, allow_disabled=True)() | |||
r1799 | old_values = c.repo_group.get_api_data() | |||
r1 | try: | |||
form_result = repo_group_form.to_python(dict(request.POST)) | ||||
gr_name = form_result['group_name'] | ||||
new_gr = RepoGroupModel().update(group_name, form_result) | ||||
r1799 | ||||
audit_logger.store( | ||||
'repo_group.edit', action_data={'old_data': old_values}, | ||||
user=c.rhodecode_user) | ||||
r1 | Session().commit() | |||
h.flash(_('Updated repository group %s') % (gr_name,), | ||||
category='success') | ||||
# we now have new name ! | ||||
group_name = new_gr.group_name | ||||
except formencode.Invalid as errors: | ||||
c.active = 'settings' | ||||
return htmlfill.render( | ||||
r1282 | render('admin/repo_groups/repo_group_edit.mako'), | |||
r1 | defaults=errors.value, | |||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8", | ||||
force_defaults=False) | ||||
except Exception: | ||||
log.exception("Exception during update or repository group") | ||||
h.flash(_('Error occurred during update of repository group %s') | ||||
% request.POST.get('group_name'), category='error') | ||||
return redirect(url('edit_repo_group', group_name=group_name)) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
@auth.CSRFRequired() | ||||
def delete(self, group_name): | ||||
gr = c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
repos = gr.repositories.all() | ||||
if repos: | ||||
msg = ungettext( | ||||
'This group contains %(num)d repository and cannot be deleted', | ||||
'This group contains %(num)d repositories and cannot be' | ||||
' deleted', | ||||
len(repos)) % {'num': len(repos)} | ||||
h.flash(msg, category='warning') | ||||
return redirect(url('repo_groups')) | ||||
children = gr.children.all() | ||||
if children: | ||||
msg = ungettext( | ||||
'This group contains %(num)d subgroup and cannot be deleted', | ||||
'This group contains %(num)d subgroups and cannot be deleted', | ||||
len(children)) % {'num': len(children)} | ||||
h.flash(msg, category='warning') | ||||
return redirect(url('repo_groups')) | ||||
try: | ||||
r1799 | old_values = gr.get_api_data() | |||
r1 | RepoGroupModel().delete(group_name) | |||
r1799 | ||||
audit_logger.store( | ||||
'repo_group.delete', | ||||
action_data={'old_data': old_values, | ||||
'source': audit_logger.SOURCE_WEB}, | ||||
user=c.rhodecode_user) | ||||
r1 | Session().commit() | |||
h.flash(_('Removed repository group %s') % group_name, | ||||
category='success') | ||||
except Exception: | ||||
log.exception("Exception during deletion of repository group") | ||||
h.flash(_('Error occurred during deletion of repository group %s') | ||||
% group_name, category='error') | ||||
return redirect(url('repo_groups')) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def edit(self, group_name): | ||||
r1799 | ||||
r1 | c.active = 'settings' | |||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
# we can only allow moving empty group if it's already a top-level | ||||
# group, ie has no parents, or we're admin | ||||
can_create_in_root = self._can_create_repo_group() | ||||
show_root_location = can_create_in_root | ||||
if not c.repo_group.parent_group: | ||||
# this group don't have a parrent so we should show empty value | ||||
show_root_location = True | ||||
self.__load_defaults(allow_empty_group=show_root_location, | ||||
repo_group=c.repo_group) | ||||
defaults = self.__load_data(c.repo_group.group_id) | ||||
return htmlfill.render( | ||||
r1282 | render('admin/repo_groups/repo_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def edit_repo_group_advanced(self, group_name): | ||||
c.active = 'advanced' | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
r1282 | return render('admin/repo_groups/repo_group_edit.mako') | |||
r1 | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def edit_repo_group_perms(self, group_name): | ||||
c.active = 'perms' | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
self.__load_defaults() | ||||
defaults = self.__load_data(c.repo_group.group_id) | ||||
return htmlfill.render( | ||||
r1282 | render('admin/repo_groups/repo_group_edit.mako'), | |||
r1 | defaults=defaults, | |||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
@auth.CSRFRequired() | ||||
def update_perms(self, group_name): | ||||
""" | ||||
Update permissions for given repository group | ||||
""" | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
valid_recursive_choices = ['none', 'repos', 'groups', 'all'] | ||||
form = RepoGroupPermsForm(valid_recursive_choices)().to_python( | ||||
request.POST) | ||||
if not c.rhodecode_user.is_admin: | ||||
if self._revoke_perms_on_yourself(form): | ||||
msg = _('Cannot change permission for yourself as admin') | ||||
h.flash(msg, category='warning') | ||||
return redirect( | ||||
url('edit_repo_group_perms', group_name=group_name)) | ||||
# iterate over all members(if in recursive mode) of this groups and | ||||
# set the permissions ! | ||||
# this can be potentially heavy operation | ||||
r1799 | changes = RepoGroupModel().update_permissions( | |||
r1 | c.repo_group, | |||
r1799 | form['perm_additions'], form['perm_updates'], form['perm_deletions'], | |||
form['recursive']) | ||||
r1 | ||||
r1799 | action_data = { | |||
'added': changes['added'], | ||||
'updated': changes['updated'], | ||||
'deleted': changes['deleted'], | ||||
'source': audit_logger.SOURCE_WEB | ||||
} | ||||
audit_logger.store( | ||||
'repo_group.edit.permissions', action_data=action_data, | ||||
user=c.rhodecode_user) | ||||
r1 | Session().commit() | |||
h.flash(_('Repository Group permissions updated'), category='success') | ||||
return redirect(url('edit_repo_group_perms', group_name=group_name)) | ||||