# -*- coding: utf-8 -*- # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from pyramid.view import view_config from pyramid.httpexceptions import HTTPUnprocessableEntity, HTTPNotFound from ziggurat_foundations.models.services.user import UserService from appenlight.lib.utils import permission_tuple_to_dict from appenlight.models.services.config import ConfigService from appenlight.models.group import Group from appenlight.models.services.group import GroupService from appenlight.models.user import User from appenlight.models import DBSession from appenlight import forms from webob.multidict import MultiDict log = logging.getLogger(__name__) _ = str @view_config( route_name="groups_no_id", renderer="json", request_method="GET", permission="authenticated", ) def groups_list(request): """ Returns groups list """ groups = Group.all().order_by(Group.group_name) list_groups = ConfigService.by_key_and_section( "list_groups_to_non_admins", "global" ) if list_groups.value or request.has_permission("root_administration"): return [g.get_dict() for g in groups] else: return [] @view_config( route_name="groups_no_id", renderer="json", request_method="POST", permission="root_administration", ) def groups_create(request): """ Returns groups list """ form = forms.GroupCreateForm( MultiDict(request.safe_json_body or {}), csrf_context=request ) if form.validate(): log.info("registering group") group = Group() # insert new group here DBSession.add(group) form.populate_obj(group) request.session.flash(_("Group created")) DBSession.flush() return group.get_dict(include_perms=True) else: return HTTPUnprocessableEntity(body=form.errors_json) @view_config( route_name="groups", renderer="json", request_method="DELETE", permission="root_administration", ) def groups_DELETE(request): """ Removes a groups permanently from db """ msg = _("You cannot remove administrator group from the system") group = GroupService.by_id(request.matchdict.get("group_id")) if group: if group.id == 1: request.session.flash(msg, "warning") else: DBSession.delete(group) request.session.flash(_("Group removed")) return True request.response.status = 422 return False @view_config( route_name="groups", renderer="json", request_method="GET", permission="root_administration", ) @view_config( route_name="groups", renderer="json", request_method="PATCH", permission="root_administration", ) def group_update(request): """ Updates group object """ group = GroupService.by_id(request.matchdict.get("group_id")) if not group: return HTTPNotFound() if request.method == "PATCH": form = forms.GroupCreateForm( MultiDict(request.unsafe_json_body), csrf_context=request ) form._modified_group = group if form.validate(): form.populate_obj(group) else: return HTTPUnprocessableEntity(body=form.errors_json) return group.get_dict(include_perms=True) @view_config( route_name="groups_property", match_param="key=resource_permissions", renderer="json", permission="root_administration", ) def groups_resource_permissions_list(request): """ Get list of permissions assigned to specific resources """ group = GroupService.by_id(request.matchdict.get("group_id")) if not group: return HTTPNotFound() return [ permission_tuple_to_dict(perm) for perm in GroupService.resources_with_possible_perms(group) ] @view_config( route_name="groups_property", match_param="key=users", request_method="GET", renderer="json", permission="root_administration", ) def groups_users_list(request): """ Get list of permissions assigned to specific resources """ group = GroupService.by_id(request.matchdict.get("group_id")) if not group: return HTTPNotFound() props = [ "user_name", "id", "first_name", "last_name", "email", "last_login_date", "status", ] users_dicts = [] for user in group.users: u_dict = user.get_dict(include_keys=props) u_dict["gravatar_url"] = UserService.gravatar_url(user, s=20) users_dicts.append(u_dict) return users_dicts @view_config( route_name="groups_property", match_param="key=users", request_method="DELETE", renderer="json", permission="root_administration", ) def groups_users_remove(request): """ Get list of permissions assigned to specific resources """ group = GroupService.by_id(request.matchdict.get("group_id")) user = UserService.by_user_name(request.GET.get("user_name")) if not group or not user: return HTTPNotFound() if len(group.users) > 1: group.users.remove(user) msg = "User removed from group" request.session.flash(msg) group.member_count = group.users_dynamic.count() return True msg = "Administrator group needs to contain at least one user" request.session.flash(msg, "warning") return False @view_config( route_name="groups_property", match_param="key=users", request_method="POST", renderer="json", permission="root_administration", ) def groups_users_add(request): """ Get list of permissions assigned to specific resources """ group = GroupService.by_id(request.matchdict.get("group_id")) user = UserService.by_user_name(request.unsafe_json_body.get("user_name")) if not user: user = UserService.by_email(request.unsafe_json_body.get("user_name")) if not group or not user: return HTTPNotFound() if user not in group.users: group.users.append(user) group.member_count = group.users_dynamic.count() props = [ "user_name", "id", "first_name", "last_name", "email", "last_login_date", "status", ] u_dict = user.get_dict(include_keys=props) u_dict["gravatar_url"] = UserService.gravatar_url(user, s=20) return u_dict