##// END OF EJS Templates
goto-switcher: optimized performance and query capabilities....
goto-switcher: optimized performance and query capabilities. - Previous implementation had on significant bug. The use of LIMIT 20 was limiting results BEFORE auth checks. In case of large ammount of similarly named repositories user didn't had access too, the result goto search was empty. This was becuase first 20 items fetched didn't pass permission checks and final auth list was empty. To fix this we now use proper filtering for auth using SQL. It means we first check user allowed repositories, and add this as a filter so end result from database is already to only the accessible repositories.

File last commit:

r1829:ff4add41 default
r2038:2bdf9d4d default
Show More
repo_groups.py
405 lines | 16.1 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2017 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Repository groups controller for RhodeCode
"""
import logging
import formencode
from formencode import htmlfill
from pylons import request, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _, ungettext
from rhodecode.lib import auth
from rhodecode.lib import helpers as h
from rhodecode.lib import audit_logger
from rhodecode.lib.ext_json import json
from rhodecode.lib.auth import (
LoginRequired, NotAnonymous, HasPermissionAll,
HasRepoGroupPermissionAll, HasRepoGroupPermissionAnyDecorator)
from rhodecode.lib.base import BaseController, render
from rhodecode.lib.utils2 import safe_int
from rhodecode.model.db import RepoGroup, User
from rhodecode.model.scm import RepoGroupList
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.forms import RepoGroupForm, RepoGroupPermsForm
from rhodecode.model.meta import Session
log = logging.getLogger(__name__)
class RepoGroupsController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
@LoginRequired()
def __before__(self):
super(RepoGroupsController, self).__before__()
def __load_defaults(self, allow_empty_group=False, repo_group=None):
if self._can_create_repo_group():
# we're global admin, we're ok and we can create TOP level groups
allow_empty_group = True
# override the choices for this form, we need to filter choices
# and display only those we have ADMIN right
groups_with_admin_rights = RepoGroupList(
RepoGroup.query().all(),
perm_set=['group.admin'])
c.repo_groups = RepoGroup.groups_choices(
groups=groups_with_admin_rights,
show_empty_group=allow_empty_group)
if repo_group:
# exclude filtered ids
exclude_group_ids = [repo_group.group_id]
c.repo_groups = filter(lambda x: x[0] not in exclude_group_ids,
c.repo_groups)
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
parent_group = repo_group.parent_group
add_parent_group = (parent_group and (
unicode(parent_group.group_id) not in c.repo_groups_choices))
if add_parent_group:
c.repo_groups_choices.append(unicode(parent_group.group_id))
c.repo_groups.append(RepoGroup._generate_choice(parent_group))
def __load_data(self, group_id):
"""
Load defaults settings for edit, and update
:param group_id:
"""
repo_group = RepoGroup.get_or_404(group_id)
data = repo_group.get_dict()
data['group_name'] = repo_group.name
# fill owner
if repo_group.user:
data.update({'user': repo_group.user.username})
else:
replacement_user = User.get_first_super_admin().username
data.update({'user': replacement_user})
# fill repository group users
for p in repo_group.repo_group_to_perm:
data.update({
'u_perm_%s' % p.user.user_id: p.permission.permission_name})
# fill repository group user groups
for p in repo_group.users_group_to_perm:
data.update({
'g_perm_%s' % p.users_group.users_group_id:
p.permission.permission_name})
# html and form expects -1 as empty parent group
data['group_parent_id'] = data['group_parent_id'] or -1
return data
def _revoke_perms_on_yourself(self, form_result):
_updates = filter(lambda u: c.rhodecode_user.user_id == int(u[0]),
form_result['perm_updates'])
_additions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]),
form_result['perm_additions'])
_deletions = filter(lambda u: c.rhodecode_user.user_id == int(u[0]),
form_result['perm_deletions'])
admin_perm = 'group.admin'
if _updates and _updates[0][1] != admin_perm or \
_additions and _additions[0][1] != admin_perm or \
_deletions and _deletions[0][1] != admin_perm:
return True
return False
def _can_create_repo_group(self, parent_group_id=None):
is_admin = HasPermissionAll('hg.admin')('group create controller')
create_repo_group = HasPermissionAll(
'hg.repogroup.create.true')('group create controller')
if is_admin or (create_repo_group and not parent_group_id):
# we're global admin, or we have global repo group create
# permission
# we're ok and we can create TOP level groups
return True
elif parent_group_id:
# we check the permission if we can write to parent group
group = RepoGroup.get(parent_group_id)
group_name = group.group_name if group else None
if HasRepoGroupPermissionAll('group.admin')(
group_name, 'check if user is an admin of group'):
# we're an admin of passed in group, we're ok.
return True
else:
return False
return False
@NotAnonymous()
def index(self):
repo_group_list = RepoGroup.get_all_repo_groups()
_perms = ['group.admin']
repo_group_list_acl = RepoGroupList(repo_group_list, perm_set=_perms)
repo_group_data = RepoGroupModel().get_repo_groups_as_dict(
repo_group_list=repo_group_list_acl, admin=True)
c.data = json.dumps(repo_group_data)
return render('admin/repo_groups/repo_groups.mako')
# perm checks inside
@NotAnonymous()
@auth.CSRFRequired()
def create(self):
parent_group_id = safe_int(request.POST.get('group_parent_id'))
can_create = self._can_create_repo_group(parent_group_id)
self.__load_defaults()
# permissions for can create group based on parent_id are checked
# here in the Form
available_groups = map(lambda k: unicode(k[0]), c.repo_groups)
repo_group_form = RepoGroupForm(available_groups=available_groups,
can_create_in_root=can_create)()
try:
owner = c.rhodecode_user
form_result = repo_group_form.to_python(dict(request.POST))
repo_group = RepoGroupModel().create(
group_name=form_result['group_name_full'],
group_description=form_result['group_description'],
owner=owner.user_id,
copy_permissions=form_result['group_copy_permissions']
)
Session().flush()
repo_group_data = repo_group.get_api_data()
audit_logger.store_web(
'repo_group.create', action_data={'data': repo_group_data},
user=c.rhodecode_user)
Session().commit()
_new_group_name = form_result['group_name_full']
repo_group_url = h.link_to(
_new_group_name,
h.route_path('repo_group_home', repo_group_name=_new_group_name))
h.flash(h.literal(_('Created repository group %s')
% repo_group_url), category='success')
except formencode.Invalid as errors:
return htmlfill.render(
render('admin/repo_groups/repo_group_add.mako'),
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False)
except Exception:
log.exception("Exception during creation of repository group")
h.flash(_('Error occurred during creation of repository group %s')
% request.POST.get('group_name'), category='error')
# TODO: maybe we should get back to the main view, not the admin one
return redirect(url('repo_groups', parent_group=parent_group_id))
# perm checks inside
@NotAnonymous()
def new(self):
# perm check for admin, create_group perm or admin of parent_group
parent_group_id = safe_int(request.GET.get('parent_group'))
if not self._can_create_repo_group(parent_group_id):
return abort(403)
self.__load_defaults()
return render('admin/repo_groups/repo_group_add.mako')
@HasRepoGroupPermissionAnyDecorator('group.admin')
@auth.CSRFRequired()
def update(self, group_name):
c.repo_group = RepoGroupModel()._get_repo_group(group_name)
can_create_in_root = self._can_create_repo_group()
show_root_location = can_create_in_root
if not c.repo_group.parent_group:
# this group don't have a parrent so we should show empty value
show_root_location = True
self.__load_defaults(allow_empty_group=show_root_location,
repo_group=c.repo_group)
repo_group_form = RepoGroupForm(
edit=True, old_data=c.repo_group.get_dict(),
available_groups=c.repo_groups_choices,
can_create_in_root=can_create_in_root, allow_disabled=True)()
old_values = c.repo_group.get_api_data()
try:
form_result = repo_group_form.to_python(dict(request.POST))
gr_name = form_result['group_name']
new_gr = RepoGroupModel().update(group_name, form_result)
audit_logger.store_web(
'repo_group.edit', action_data={'old_data': old_values},
user=c.rhodecode_user)
Session().commit()
h.flash(_('Updated repository group %s') % (gr_name,),
category='success')
# we now have new name !
group_name = new_gr.group_name
except formencode.Invalid as errors:
c.active = 'settings'
return htmlfill.render(
render('admin/repo_groups/repo_group_edit.mako'),
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False)
except Exception:
log.exception("Exception during update or repository group")
h.flash(_('Error occurred during update of repository group %s')
% request.POST.get('group_name'), category='error')
return redirect(url('edit_repo_group', group_name=group_name))
@HasRepoGroupPermissionAnyDecorator('group.admin')
@auth.CSRFRequired()
def delete(self, group_name):
gr = c.repo_group = RepoGroupModel()._get_repo_group(group_name)
repos = gr.repositories.all()
if repos:
msg = ungettext(
'This group contains %(num)d repository and cannot be deleted',
'This group contains %(num)d repositories and cannot be'
' deleted',
len(repos)) % {'num': len(repos)}
h.flash(msg, category='warning')
return redirect(url('repo_groups'))
children = gr.children.all()
if children:
msg = ungettext(
'This group contains %(num)d subgroup and cannot be deleted',
'This group contains %(num)d subgroups and cannot be deleted',
len(children)) % {'num': len(children)}
h.flash(msg, category='warning')
return redirect(url('repo_groups'))
try:
old_values = gr.get_api_data()
RepoGroupModel().delete(group_name)
audit_logger.store_web(
'repo_group.delete', action_data={'old_data': old_values},
user=c.rhodecode_user)
Session().commit()
h.flash(_('Removed repository group %s') % group_name,
category='success')
except Exception:
log.exception("Exception during deletion of repository group")
h.flash(_('Error occurred during deletion of repository group %s')
% group_name, category='error')
return redirect(url('repo_groups'))
@HasRepoGroupPermissionAnyDecorator('group.admin')
def edit(self, group_name):
c.active = 'settings'
c.repo_group = RepoGroupModel()._get_repo_group(group_name)
# we can only allow moving empty group if it's already a top-level
# group, ie has no parents, or we're admin
can_create_in_root = self._can_create_repo_group()
show_root_location = can_create_in_root
if not c.repo_group.parent_group:
# this group don't have a parrent so we should show empty value
show_root_location = True
self.__load_defaults(allow_empty_group=show_root_location,
repo_group=c.repo_group)
defaults = self.__load_data(c.repo_group.group_id)
return htmlfill.render(
render('admin/repo_groups/repo_group_edit.mako'),
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
@HasRepoGroupPermissionAnyDecorator('group.admin')
def edit_repo_group_advanced(self, group_name):
c.active = 'advanced'
c.repo_group = RepoGroupModel()._get_repo_group(group_name)
return render('admin/repo_groups/repo_group_edit.mako')
@HasRepoGroupPermissionAnyDecorator('group.admin')
def edit_repo_group_perms(self, group_name):
c.active = 'perms'
c.repo_group = RepoGroupModel()._get_repo_group(group_name)
self.__load_defaults()
defaults = self.__load_data(c.repo_group.group_id)
return htmlfill.render(
render('admin/repo_groups/repo_group_edit.mako'),
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
@HasRepoGroupPermissionAnyDecorator('group.admin')
@auth.CSRFRequired()
def update_perms(self, group_name):
"""
Update permissions for given repository group
"""
c.repo_group = RepoGroupModel()._get_repo_group(group_name)
valid_recursive_choices = ['none', 'repos', 'groups', 'all']
form = RepoGroupPermsForm(valid_recursive_choices)().to_python(
request.POST)
if not c.rhodecode_user.is_admin:
if self._revoke_perms_on_yourself(form):
msg = _('Cannot change permission for yourself as admin')
h.flash(msg, category='warning')
return redirect(
url('edit_repo_group_perms', group_name=group_name))
# iterate over all members(if in recursive mode) of this groups and
# set the permissions !
# this can be potentially heavy operation
changes = RepoGroupModel().update_permissions(
c.repo_group,
form['perm_additions'], form['perm_updates'], form['perm_deletions'],
form['recursive'])
action_data = {
'added': changes['added'],
'updated': changes['updated'],
'deleted': changes['deleted'],
}
audit_logger.store_web(
'repo_group.edit.permissions', action_data=action_data,
user=c.rhodecode_user)
Session().commit()
h.flash(_('Repository Group permissions updated'), category='success')
return redirect(url('edit_repo_group_perms', group_name=group_name))