repo_groups.py
471 lines
| 19.5 KiB
| text/x-python
|
PythonLexer
Bradley M. Kuhn
|
r4116 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
rhodecode.controllers.admin.repo_groups | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
Repository groups controller for RhodeCode | ||||
:created_on: Mar 23, 2010 | ||||
:author: marcink | ||||
:copyright: (c) 2013 RhodeCode GmbH. | ||||
:license: GPLv3, see LICENSE for more details. | ||||
""" | ||||
import logging | ||||
import traceback | ||||
import formencode | ||||
import itertools | ||||
from formencode import htmlfill | ||||
from pylons import request, tmpl_context as c, url | ||||
from pylons.controllers.util import abort, redirect | ||||
from pylons.i18n.translation import _, ungettext | ||||
from sqlalchemy.exc import IntegrityError | ||||
import rhodecode | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib.compat import json | ||||
from rhodecode.lib.auth import LoginRequired, HasPermissionAnyDecorator,\ | ||||
HasRepoGroupPermissionAnyDecorator, HasRepoGroupPermissionAll,\ | ||||
HasPermissionAll | ||||
from rhodecode.lib.base import BaseController, render | ||||
from rhodecode.model.db import RepoGroup, Repository | ||||
from rhodecode.model.scm import RepoGroupList | ||||
from rhodecode.model.repo_group import RepoGroupModel | ||||
from rhodecode.model.forms import RepoGroupForm, RepoGroupPermsForm | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.repo import RepoModel | ||||
from webob.exc import HTTPInternalServerError, HTTPNotFound | ||||
from rhodecode.lib.utils2 import str2bool, safe_int | ||||
from sqlalchemy.sql.expression import func | ||||
log = logging.getLogger(__name__) | ||||
class RepoGroupsController(BaseController): | ||||
"""REST Controller styled on the Atom Publishing Protocol""" | ||||
@LoginRequired() | ||||
def __before__(self): | ||||
super(RepoGroupsController, self).__before__() | ||||
def __load_defaults(self, allow_empty_group=False, exclude_group_ids=[]): | ||||
if HasPermissionAll('hg.admin')('group edit'): | ||||
#we're global admin, we're ok and we can create TOP level groups | ||||
allow_empty_group = True | ||||
#override the choices for this form, we need to filter choices | ||||
#and display only those we have ADMIN right | ||||
groups_with_admin_rights = RepoGroupList(RepoGroup.query().all(), | ||||
perm_set=['group.admin']) | ||||
c.repo_groups = RepoGroup.groups_choices(groups=groups_with_admin_rights, | ||||
show_empty_group=allow_empty_group) | ||||
# exclude filtered ids | ||||
c.repo_groups = filter(lambda x: x[0] not in exclude_group_ids, | ||||
c.repo_groups) | ||||
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups) | ||||
repo_model = RepoModel() | ||||
c.users_array = repo_model.get_users_js() | ||||
c.user_groups_array = repo_model.get_user_groups_js() | ||||
def __load_data(self, group_id): | ||||
""" | ||||
Load defaults settings for edit, and update | ||||
:param group_id: | ||||
""" | ||||
repo_group = RepoGroup.get_or_404(group_id) | ||||
data = repo_group.get_dict() | ||||
data['group_name'] = repo_group.name | ||||
# fill repository group users | ||||
for p in repo_group.repo_group_to_perm: | ||||
data.update({'u_perm_%s' % p.user.username: | ||||
p.permission.permission_name}) | ||||
# fill repository group groups | ||||
for p in repo_group.users_group_to_perm: | ||||
data.update({'g_perm_%s' % p.users_group.users_group_name: | ||||
p.permission.permission_name}) | ||||
return data | ||||
def _revoke_perms_on_yourself(self, form_result): | ||||
_up = filter(lambda u: c.rhodecode_user.username == u[0], | ||||
form_result['perms_updates']) | ||||
_new = filter(lambda u: c.rhodecode_user.username == u[0], | ||||
form_result['perms_new']) | ||||
if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin': | ||||
return True | ||||
return False | ||||
def index(self, format='html'): | ||||
"""GET /repo_groups: All items in the collection""" | ||||
# url('repos_groups') | ||||
_list = RepoGroup.query()\ | ||||
.order_by(func.lower(RepoGroup.group_name))\ | ||||
.all() | ||||
group_iter = RepoGroupList(_list, perm_set=['group.admin']) | ||||
repo_groups_data = [] | ||||
total_records = len(group_iter) | ||||
_tmpl_lookup = rhodecode.CONFIG['pylons.app_globals'].mako_lookup | ||||
template = _tmpl_lookup.get_template('data_table/_dt_elements.html') | ||||
repo_group_name = lambda repo_group_name, children_groups: ( | ||||
template.get_def("repo_group_name") | ||||
.render(repo_group_name, children_groups, _=_, h=h, c=c) | ||||
) | ||||
repo_group_actions = lambda repo_group_id, repo_group_name, gr_count: ( | ||||
template.get_def("repo_group_actions") | ||||
.render(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c, | ||||
ungettext=ungettext) | ||||
) | ||||
for repo_gr in group_iter: | ||||
children_groups = map(h.safe_unicode, | ||||
itertools.chain((g.name for g in repo_gr.parents), | ||||
(x.name for x in [repo_gr]))) | ||||
repo_count = repo_gr.repositories.count() | ||||
repo_groups_data.append({ | ||||
"raw_name": repo_gr.group_name, | ||||
"group_name": repo_group_name(repo_gr.group_name, children_groups), | ||||
"desc": repo_gr.group_description, | ||||
"repos": repo_count, | ||||
"owner": h.person(repo_gr.user.username), | ||||
"action": repo_group_actions(repo_gr.group_id, repo_gr.group_name, | ||||
repo_count) | ||||
}) | ||||
c.data = json.dumps({ | ||||
"totalRecords": total_records, | ||||
"startIndex": 0, | ||||
"sort": None, | ||||
"dir": "asc", | ||||
"records": repo_groups_data | ||||
}) | ||||
return render('admin/repo_groups/repo_groups.html') | ||||
def create(self): | ||||
"""POST /repo_groups: Create a new item""" | ||||
# url('repos_groups') | ||||
self.__load_defaults() | ||||
# permissions for can create group based on parent_id are checked | ||||
# here in the Form | ||||
repo_group_form = RepoGroupForm(available_groups= | ||||
map(lambda k: unicode(k[0]), c.repo_groups))() | ||||
try: | ||||
form_result = repo_group_form.to_python(dict(request.POST)) | ||||
RepoGroupModel().create( | ||||
group_name=form_result['group_name'], | ||||
group_description=form_result['group_description'], | ||||
parent=form_result['group_parent_id'], | ||||
owner=self.rhodecode_user.user_id, | ||||
copy_permissions=form_result['group_copy_permissions'] | ||||
) | ||||
Session().commit() | ||||
h.flash(_('Created repository group %s') \ | ||||
% form_result['group_name'], category='success') | ||||
#TODO: in futureaction_logger(, '', '', '', self.sa) | ||||
except formencode.Invalid, errors: | ||||
return htmlfill.render( | ||||
render('admin/repo_groups/repo_group_add.html'), | ||||
defaults=errors.value, | ||||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8") | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('Error occurred during creation of repository group %s') \ | ||||
% request.POST.get('group_name'), category='error') | ||||
parent_group_id = form_result['group_parent_id'] | ||||
#TODO: maybe we should get back to the main view, not the admin one | ||||
return redirect(url('repos_groups', parent_group=parent_group_id)) | ||||
def new(self): | ||||
"""GET /repo_groups/new: Form to create a new item""" | ||||
# url('new_repos_group') | ||||
if HasPermissionAll('hg.admin')('group create'): | ||||
#we're global admin, we're ok and we can create TOP level groups | ||||
pass | ||||
else: | ||||
# we pass in parent group into creation form, thus we know | ||||
# what would be the group, we can check perms here ! | ||||
group_id = safe_int(request.GET.get('parent_group')) | ||||
group = RepoGroup.get(group_id) if group_id else None | ||||
group_name = group.group_name if group else None | ||||
if HasRepoGroupPermissionAll('group.admin')(group_name, 'group create'): | ||||
pass | ||||
else: | ||||
return abort(403) | ||||
self.__load_defaults() | ||||
return render('admin/repo_groups/repo_group_add.html') | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def update(self, group_name): | ||||
"""PUT /repo_groups/group_name: Update an existing item""" | ||||
# Forms posted to this method should contain a hidden field: | ||||
# <input type="hidden" name="_method" value="PUT" /> | ||||
# Or using helpers: | ||||
# h.form(url('repos_group', group_name=GROUP_NAME), | ||||
# method='put') | ||||
# url('repos_group', group_name=GROUP_NAME) | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
if HasPermissionAll('hg.admin')('group edit'): | ||||
#we're global admin, we're ok and we can create TOP level groups | ||||
allow_empty_group = True | ||||
elif not c.repo_group.parent_group: | ||||
allow_empty_group = True | ||||
else: | ||||
allow_empty_group = False | ||||
self.__load_defaults(allow_empty_group=allow_empty_group, | ||||
exclude_group_ids=[c.repo_group.group_id]) | ||||
repo_group_form = RepoGroupForm( | ||||
edit=True, | ||||
old_data=c.repo_group.get_dict(), | ||||
available_groups=c.repo_groups_choices, | ||||
can_create_in_root=allow_empty_group, | ||||
)() | ||||
try: | ||||
form_result = repo_group_form.to_python(dict(request.POST)) | ||||
new_gr = RepoGroupModel().update(group_name, form_result) | ||||
Session().commit() | ||||
h.flash(_('Updated repository group %s') \ | ||||
% form_result['group_name'], category='success') | ||||
# we now have new name ! | ||||
group_name = new_gr.group_name | ||||
#TODO: in future action_logger(, '', '', '', self.sa) | ||||
except formencode.Invalid, errors: | ||||
return htmlfill.render( | ||||
render('admin/repo_groups/repo_group_edit.html'), | ||||
defaults=errors.value, | ||||
errors=errors.error_dict or {}, | ||||
prefix_error=False, | ||||
encoding="UTF-8") | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('Error occurred during update of repository group %s') \ | ||||
% request.POST.get('group_name'), category='error') | ||||
return redirect(url('edit_repo_group', group_name=group_name)) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def delete(self, group_name): | ||||
"""DELETE /repo_groups/group_name: Delete an existing item""" | ||||
# Forms posted to this method should contain a hidden field: | ||||
# <input type="hidden" name="_method" value="DELETE" /> | ||||
# Or using helpers: | ||||
# h.form(url('repos_group', group_name=GROUP_NAME), | ||||
# method='delete') | ||||
# url('repos_group', group_name=GROUP_NAME) | ||||
gr = c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
repos = gr.repositories.all() | ||||
if repos: | ||||
h.flash(_('This group contains %s repositores and cannot be ' | ||||
'deleted') % len(repos), category='warning') | ||||
return redirect(url('repos_groups')) | ||||
children = gr.children.all() | ||||
if children: | ||||
h.flash(_('This group contains %s subgroups and cannot be deleted' | ||||
% (len(children))), category='warning') | ||||
return redirect(url('repos_groups')) | ||||
try: | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
h.flash(_('Removed repository group %s') % group_name, | ||||
category='success') | ||||
#TODO: in future action_logger(, '', '', '', self.sa) | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('Error occurred during deletion of repository group %s') | ||||
% group_name, category='error') | ||||
return redirect(url('repos_groups')) | ||||
def show_by_name(self, group_name): | ||||
""" | ||||
This is a proxy that does a lookup group_name -> id, and shows | ||||
the group by id view instead | ||||
""" | ||||
group_name = group_name.rstrip('/') | ||||
id_ = RepoGroup.get_by_group_name(group_name) | ||||
if id_: | ||||
return self.show(group_name) | ||||
raise HTTPNotFound | ||||
@HasRepoGroupPermissionAnyDecorator('group.read', 'group.write', | ||||
'group.admin') | ||||
def show(self, group_name): | ||||
"""GET /repo_groups/group_name: Show a specific item""" | ||||
# url('repos_group', group_name=GROUP_NAME) | ||||
c.active = 'settings' | ||||
c.group = c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
c.group_repos = c.group.repositories.all() | ||||
#overwrite our cached list with current filter | ||||
gr_filter = c.group_repos | ||||
c.repo_cnt = 0 | ||||
groups = RepoGroup.query().order_by(RepoGroup.group_name)\ | ||||
.filter(RepoGroup.group_parent_id == c.group.group_id).all() | ||||
c.groups = self.scm_model.get_repo_groups(groups) | ||||
c.repos_list = Repository.query()\ | ||||
.filter(Repository.group_id == c.group.group_id)\ | ||||
.order_by(func.lower(Repository.repo_name))\ | ||||
.all() | ||||
repos_data = RepoModel().get_repos_as_dict(repos_list=c.repos_list, | ||||
admin=False) | ||||
#json used to render the grid | ||||
c.data = json.dumps(repos_data) | ||||
return render('admin/repo_groups/repo_group_show.html') | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def edit(self, group_name): | ||||
"""GET /repo_groups/group_name/edit: Form to edit an existing item""" | ||||
# url('edit_repo_group', group_name=GROUP_NAME) | ||||
c.active = 'settings' | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
#we can only allow moving empty group if it's already a top-level | ||||
#group, ie has no parents, or we're admin | ||||
if HasPermissionAll('hg.admin')('group edit'): | ||||
#we're global admin, we're ok and we can create TOP level groups | ||||
allow_empty_group = True | ||||
elif not c.repo_group.parent_group: | ||||
allow_empty_group = True | ||||
else: | ||||
allow_empty_group = False | ||||
self.__load_defaults(allow_empty_group=allow_empty_group, | ||||
exclude_group_ids=[c.repo_group.group_id]) | ||||
defaults = self.__load_data(c.repo_group.group_id) | ||||
return htmlfill.render( | ||||
render('admin/repo_groups/repo_group_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def edit_repo_group_advanced(self, group_name): | ||||
"""GET /repo_groups/group_name/edit: Form to edit an existing item""" | ||||
# url('edit_repo_group', group_name=GROUP_NAME) | ||||
c.active = 'advanced' | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
return render('admin/repo_groups/repo_group_edit.html') | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def edit_repo_group_perms(self, group_name): | ||||
"""GET /repo_groups/group_name/edit: Form to edit an existing item""" | ||||
# url('edit_repo_group', group_name=GROUP_NAME) | ||||
c.active = 'perms' | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
self.__load_defaults() | ||||
defaults = self.__load_data(c.repo_group.group_id) | ||||
return htmlfill.render( | ||||
render('admin/repo_groups/repo_group_edit.html'), | ||||
defaults=defaults, | ||||
encoding="UTF-8", | ||||
force_defaults=False | ||||
) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def update_perms(self, group_name): | ||||
""" | ||||
Update permissions for given repository group | ||||
:param group_name: | ||||
""" | ||||
c.repo_group = RepoGroupModel()._get_repo_group(group_name) | ||||
valid_recursive_choices = ['none', 'repos', 'groups', 'all'] | ||||
form_result = RepoGroupPermsForm(valid_recursive_choices)().to_python(request.POST) | ||||
if not c.rhodecode_user.is_admin: | ||||
if self._revoke_perms_on_yourself(form_result): | ||||
msg = _('Cannot revoke permission for yourself as admin') | ||||
h.flash(msg, category='warning') | ||||
return redirect(url('edit_repo_group_perms', group_name=group_name)) | ||||
recursive = form_result['recursive'] | ||||
# iterate over all members(if in recursive mode) of this groups and | ||||
# set the permissions ! | ||||
# this can be potentially heavy operation | ||||
RepoGroupModel()._update_permissions(c.repo_group, | ||||
form_result['perms_new'], | ||||
form_result['perms_updates'], | ||||
recursive) | ||||
#TODO: implement this | ||||
#action_logger(self.rhodecode_user, 'admin_changed_repo_permissions', | ||||
# repo_name, self.ip_addr, self.sa) | ||||
Session().commit() | ||||
h.flash(_('Repository Group permissions updated'), category='success') | ||||
return redirect(url('edit_repo_group_perms', group_name=group_name)) | ||||
@HasRepoGroupPermissionAnyDecorator('group.admin') | ||||
def delete_perms(self, group_name): | ||||
""" | ||||
DELETE an existing repository group permission user | ||||
:param group_name: | ||||
""" | ||||
try: | ||||
obj_type = request.POST.get('obj_type') | ||||
obj_id = None | ||||
if obj_type == 'user': | ||||
obj_id = safe_int(request.POST.get('user_id')) | ||||
elif obj_type == 'user_group': | ||||
obj_id = safe_int(request.POST.get('user_group_id')) | ||||
if not c.rhodecode_user.is_admin: | ||||
if obj_type == 'user' and c.rhodecode_user.user_id == obj_id: | ||||
msg = _('Cannot revoke permission for yourself as admin') | ||||
h.flash(msg, category='warning') | ||||
raise Exception('revoke admin permission on self') | ||||
recursive = request.POST.get('recursive', 'none') | ||||
if obj_type == 'user': | ||||
RepoGroupModel().delete_permission(repo_group=group_name, | ||||
obj=obj_id, obj_type='user', | ||||
recursive=recursive) | ||||
elif obj_type == 'user_group': | ||||
RepoGroupModel().delete_permission(repo_group=group_name, | ||||
obj=obj_id, | ||||
obj_type='user_group', | ||||
recursive=recursive) | ||||
Session().commit() | ||||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
h.flash(_('An error occurred during revoking of permission'), | ||||
category='error') | ||||
raise HTTPInternalServerError() | ||||