# Copyright (C) 2016-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import datetime import logging import time import formencode import formencode.htmlfill from pyramid.httpexceptions import HTTPFound, HTTPForbidden from pyramid.renderers import render from pyramid.response import Response from sqlalchemy.orm import aliased from rhodecode import events from rhodecode.apps._base import BaseAppView, DataGridAppView from rhodecode.lib.auth import ( LoginRequired, CSRFRequired, NotAnonymous, HasPermissionAny, HasRepoGroupPermissionAny) from rhodecode.lib import helpers as h, audit_logger from rhodecode.lib.str_utils import safe_int, safe_str from rhodecode.model.forms import RepoGroupForm from rhodecode.model.permission import PermissionModel from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.scm import RepoGroupList from rhodecode.model.db import ( or_, count, func, in_filter_generator, Session, RepoGroup, User, Repository) log = logging.getLogger(__name__) class AdminRepoGroupsView(BaseAppView, DataGridAppView): def load_default_context(self): c = self._get_local_tmpl_context() return c def _load_form_data(self, c): allow_empty_group = False if self._can_create_repo_group(): # we're global admin, we're ok and we can create TOP level groups allow_empty_group = True # override the choices for this form, we need to filter choices # and display only those we have ADMIN right groups_with_admin_rights = RepoGroupList( RepoGroup.query().all(), perm_set=['group.admin'], extra_kwargs=dict(user=self._rhodecode_user)) c.repo_groups = RepoGroup.groups_choices( groups=groups_with_admin_rights, show_empty_group=allow_empty_group) c.personal_repo_group = self._rhodecode_user.personal_repo_group def _can_create_repo_group(self, parent_group_id=None): is_admin = HasPermissionAny('hg.admin')('group create controller') create_repo_group = HasPermissionAny( 'hg.repogroup.create.true')('group create controller') if is_admin or (create_repo_group and not parent_group_id): # we're global admin, or we have global repo group create # permission # we're ok and we can create TOP level groups return True elif parent_group_id: # we check the permission if we can write to parent group group = RepoGroup.get(parent_group_id) group_name = group.group_name if group else None if HasRepoGroupPermissionAny('group.admin')( group_name, 'check if user is an admin of group'): # we're an admin of passed in group, we're ok. return True else: return False return False # permission check in data loading of # `repo_group_list_data` via RepoGroupList @LoginRequired() @NotAnonymous() def repo_group_list(self): c = self.load_default_context() return self._get_template_context(c) # permission check inside @LoginRequired() @NotAnonymous() def repo_group_list_data(self): self.load_default_context() column_map = { 'name': 'group_name_hash', 'desc': 'group_description', 'last_change': 'updated_on', 'top_level_repos': 'repos_total', 'owner': 'user_username', } draw, start, limit = self._extract_chunk(self.request) search_q, order_by, order_dir = self._extract_ordering( self.request, column_map=column_map) _render = self.request.get_partial_renderer( 'rhodecode:templates/data_table/_dt_elements.mako') c = _render.get_call_context() def quick_menu(repo_group_name): return _render('quick_repo_group_menu', repo_group_name) def repo_group_lnk(repo_group_name): return _render('repo_group_name', repo_group_name) def last_change(last_change): if isinstance(last_change, datetime.datetime) and not last_change.tzinfo: ts = time.time() utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds() last_change = last_change + datetime.timedelta(seconds=utc_offset) return _render("last_change", last_change) def desc(desc, personal): return _render( 'repo_group_desc', desc, personal, c.visual.stylify_metatags) def repo_group_actions(repo_group_id, repo_group_name, gr_count): return _render( 'repo_group_actions', repo_group_id, repo_group_name, gr_count) def user_profile(username): return _render('user_profile', username) _perms = ['group.admin'] allowed_ids = [-1] + self._rhodecode_user.repo_group_acl_ids_from_stack(_perms) repo_groups_data_total_count = RepoGroup.query()\ .filter(or_( # generate multiple IN to fix limitation problems *in_filter_generator(RepoGroup.group_id, allowed_ids) )) \ .count() repo_groups_data_total_inactive_count = RepoGroup.query()\ .filter(RepoGroup.group_id.in_(allowed_ids))\ .count() repo_count = count(Repository.repo_id) OwnerUser = aliased(User) base_q = Session.query( RepoGroup.group_name, RepoGroup.group_name_hash, RepoGroup.group_description, RepoGroup.group_id, RepoGroup.personal, RepoGroup.updated_on, OwnerUser.username.label('owner_username'), repo_count.label('repos_count') ) \ .filter(or_( # generate multiple IN to fix limitation problems *in_filter_generator(RepoGroup.group_id, allowed_ids) )) \ .outerjoin(Repository, RepoGroup.group_id == Repository.group_id) \ .join(OwnerUser, RepoGroup.user_id == OwnerUser.user_id) base_q = base_q.group_by(RepoGroup, OwnerUser) if search_q: like_expression = f'%{safe_str(search_q)}%' base_q = base_q.filter(or_( RepoGroup.group_name.ilike(like_expression), )) repo_groups_data_total_filtered_count = base_q.count() # the inactive isn't really used, but we still make it same as other data grids # which use inactive (users,user groups) repo_groups_data_total_filtered_inactive_count = repo_groups_data_total_filtered_count sort_defined = False if order_by == 'group_name': sort_col = func.lower(RepoGroup.group_name) sort_defined = True elif order_by == 'repos_total': sort_col = repo_count sort_defined = True elif order_by == 'user_username': sort_col = OwnerUser.username else: sort_col = getattr(RepoGroup, order_by, None) if sort_defined or sort_col: if order_dir == 'asc': sort_col = sort_col.asc() else: sort_col = sort_col.desc() base_q = base_q.order_by(sort_col) base_q = base_q.offset(start).limit(limit) # authenticated access to user groups auth_repo_group_list = base_q.all() repo_groups_data = [] for repo_gr in auth_repo_group_list: row = { "menu": quick_menu(repo_gr.group_name), "name": repo_group_lnk(repo_gr.group_name), "last_change": last_change(repo_gr.updated_on), "last_changeset": "", "last_changeset_raw": "", "desc": desc(repo_gr.group_description, repo_gr.personal), "owner": user_profile(repo_gr.owner_username), "top_level_repos": repo_gr.repos_count, "action": repo_group_actions( repo_gr.group_id, repo_gr.group_name, repo_gr.repos_count), } repo_groups_data.append(row) data = ({ 'draw': draw, 'data': repo_groups_data, 'recordsTotal': repo_groups_data_total_count, 'recordsTotalInactive': repo_groups_data_total_inactive_count, 'recordsFiltered': repo_groups_data_total_filtered_count, 'recordsFilteredInactive': repo_groups_data_total_filtered_inactive_count, }) return data @LoginRequired() @NotAnonymous() # perm checks inside def repo_group_new(self): c = self.load_default_context() # perm check for admin, create_group perm or admin of parent_group parent_group_id = safe_int(self.request.GET.get('parent_group')) _gr = RepoGroup.get(parent_group_id) if not self._can_create_repo_group(parent_group_id): raise HTTPForbidden() self._load_form_data(c) defaults = {} # Future proof for default of repo group parent_group_choice = '-1' if not self._rhodecode_user.is_admin and self._rhodecode_user.personal_repo_group: parent_group_choice = self._rhodecode_user.personal_repo_group if parent_group_id and _gr: if parent_group_id in [x[0] for x in c.repo_groups]: parent_group_choice = safe_str(parent_group_id) defaults.update({'group_parent_id': parent_group_choice}) data = render( 'rhodecode:templates/admin/repo_groups/repo_group_add.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=defaults, encoding="UTF-8", force_defaults=False ) return Response(html) @LoginRequired() @NotAnonymous() @CSRFRequired() # perm checks inside def repo_group_create(self): c = self.load_default_context() _ = self.request.translate parent_group_id = safe_int(self.request.POST.get('group_parent_id')) can_create = self._can_create_repo_group(parent_group_id) self._load_form_data(c) # permissions for can create group based on parent_id are checked # here in the Form available_groups = list(map(lambda k: safe_str(k[0]), c.repo_groups)) repo_group_form = RepoGroupForm( self.request.translate, available_groups=available_groups, can_create_in_root=can_create)() repo_group_name = self.request.POST.get('group_name') try: owner = self._rhodecode_user form_result = repo_group_form.to_python(dict(self.request.POST)) copy_permissions = form_result.get('group_copy_permissions') repo_group = RepoGroupModel().create( group_name=form_result['group_name_full'], group_description=form_result['group_description'], owner=owner.user_id, copy_permissions=form_result['group_copy_permissions'] ) Session().flush() repo_group_data = repo_group.get_api_data() audit_logger.store_web( 'repo_group.create', action_data={'data': repo_group_data}, user=self._rhodecode_user) Session().commit() _new_group_name = form_result['group_name_full'] repo_group_url = h.link_to( _new_group_name, h.route_path('repo_group_home', repo_group_name=_new_group_name)) h.flash(h.literal(_('Created repository group %s') % repo_group_url), category='success') except formencode.Invalid as errors: data = render( 'rhodecode:templates/admin/repo_groups/repo_group_add.mako', self._get_template_context(c), self.request) html = formencode.htmlfill.render( data, defaults=errors.value, errors=errors.unpack_errors() or {}, prefix_error=False, encoding="UTF-8", force_defaults=False ) return Response(html) except Exception: log.exception("Exception during creation of repository group") h.flash(_('Error occurred during creation of repository group %s') % repo_group_name, category='error') raise HTTPFound(h.route_path('home')) PermissionModel().trigger_permission_flush() raise HTTPFound( h.route_path('repo_group_home', repo_group_name=form_result['group_name_full']))