##// END OF EJS Templates
tests: fixed test suite for celery adoption
tests: fixed test suite for celery adoption

File last commit:

r5242:f8424773 default
r5607:39b20522 default
Show More
repositories.py
253 lines | 9.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2016-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import formencode
import formencode.htmlfill
from pyramid.httpexceptions import HTTPFound, HTTPForbidden
from pyramid.renderers import render
from pyramid.response import Response
from sqlalchemy.orm import aliased
from rhodecode import events
from rhodecode.apps._base import BaseAppView, DataGridAppView
from rhodecode.lib.celerylib.utils import get_task_id
from rhodecode.lib.auth import (
LoginRequired, CSRFRequired, NotAnonymous,
HasPermissionAny, HasRepoGroupPermissionAny)
from rhodecode.lib import helpers as h
from rhodecode.lib.utils import repo_name_slug
from rhodecode.lib.utils2 import safe_int, safe_str
from rhodecode.model.forms import RepoForm
from rhodecode.model.permission import PermissionModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.scm import RepoList, RepoGroupList, ScmModel
from rhodecode.model.settings import SettingsModel
from rhodecode.model.db import (
in_filter_generator, or_, func, Session, Repository, RepoGroup, User)
log = logging.getLogger(__name__)
class AdminReposView(BaseAppView, DataGridAppView):
def load_default_context(self):
c = self._get_local_tmpl_context()
return c
def _load_form_data(self, c):
acl_groups = RepoGroupList(RepoGroup.query().all(),
perm_set=['group.write', 'group.admin'])
c.repo_groups = RepoGroup.groups_choices(groups=acl_groups)
c.repo_groups_choices = list(map(lambda k: safe_str(k[0]), c.repo_groups))
c.personal_repo_group = self._rhodecode_user.personal_repo_group
@LoginRequired()
@NotAnonymous()
# perms check inside
def repository_list(self):
c = self.load_default_context()
return self._get_template_context(c)
@LoginRequired()
@NotAnonymous()
# perms check inside
def repository_list_data(self):
self.load_default_context()
column_map = {
'name': 'repo_name',
'desc': 'description',
'last_change': 'updated_on',
'owner': 'user_username',
}
draw, start, limit = self._extract_chunk(self.request)
search_q, order_by, order_dir = self._extract_ordering(
self.request, column_map=column_map)
_perms = ['repository.admin']
allowed_ids = [-1] + self._rhodecode_user.repo_acl_ids_from_stack(_perms)
repos_data_total_count = Repository.query() \
.filter(or_(
# generate multiple IN to fix limitation problems
*in_filter_generator(Repository.repo_id, allowed_ids))
) \
.count()
RepoFork = aliased(Repository)
OwnerUser = aliased(User)
base_q = Session.query(
Repository.repo_id,
Repository.repo_name,
Repository.description,
Repository.repo_type,
Repository.repo_state,
Repository.private,
Repository.archived,
Repository.updated_on,
Repository._changeset_cache,
RepoFork.repo_name.label('fork_repo_name'),
OwnerUser.username.label('owner_username'),
) \
.filter(or_(
# generate multiple IN to fix limitation problems
*in_filter_generator(Repository.repo_id, allowed_ids))
) \
.outerjoin(RepoFork, Repository.fork_id == RepoFork.repo_id) \
.join(OwnerUser, Repository.user_id == OwnerUser.user_id)
if search_q:
like_expression = f'%{safe_str(search_q)}%'
base_q = base_q.filter(or_(
Repository.repo_name.ilike(like_expression),
))
#TODO: check if we need group_by here ?
#base_q = base_q.group_by(Repository, User)
repos_data_total_filtered_count = base_q.count()
sort_defined = False
if order_by == 'repo_name':
sort_col = func.lower(Repository.repo_name)
sort_defined = True
elif order_by == 'user_username':
sort_col = OwnerUser.username
else:
sort_col = getattr(Repository, order_by, None)
if sort_defined or sort_col:
if order_dir == 'asc':
sort_col = sort_col.asc()
else:
sort_col = sort_col.desc()
base_q = base_q.order_by(sort_col)
base_q = base_q.offset(start).limit(limit)
repos_list = base_q.all()
repos_data = RepoModel().get_repos_as_dict(
repo_list=repos_list, admin=True, super_user_actions=True)
data = ({
'draw': draw,
'data': repos_data,
'recordsTotal': repos_data_total_count,
'recordsFiltered': repos_data_total_filtered_count,
})
return data
@LoginRequired()
@NotAnonymous()
# perms check inside
def repository_new(self):
c = self.load_default_context()
new_repo = self.request.GET.get('repo', '')
parent_group_id = safe_int(self.request.GET.get('parent_group'))
_gr = RepoGroup.get(parent_group_id)
if not HasPermissionAny('hg.admin', 'hg.create.repository')():
# you're not super admin nor have global create permissions,
# but maybe you have at least write permission to a parent group ?
gr_name = _gr.group_name if _gr else None
# create repositories with write permission on group is set to true
create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
group_admin = HasRepoGroupPermissionAny('group.admin')(group_name=gr_name)
group_write = HasRepoGroupPermissionAny('group.write')(group_name=gr_name)
if not (group_admin or (group_write and create_on_write)):
raise HTTPForbidden()
self._load_form_data(c)
c.new_repo = repo_name_slug(new_repo)
# apply the defaults from defaults page
defaults = SettingsModel().get_default_repo_settings(strip_prefix=True)
# set checkbox to autochecked
defaults['repo_copy_permissions'] = True
parent_group_choice = '-1'
if not self._rhodecode_user.is_admin and self._rhodecode_user.personal_repo_group:
parent_group_choice = self._rhodecode_user.personal_repo_group
if parent_group_id and _gr:
if parent_group_id in [x[0] for x in c.repo_groups]:
parent_group_choice = safe_str(parent_group_id)
defaults.update({'repo_group': parent_group_choice})
data = render('rhodecode:templates/admin/repos/repo_add.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
@LoginRequired()
@NotAnonymous()
@CSRFRequired()
# perms check inside
def repository_create(self):
c = self.load_default_context()
form_result = {}
self._load_form_data(c)
try:
# CanWriteToGroup validators checks permissions of this POST
form = RepoForm(
self.request.translate, repo_groups=c.repo_groups_choices)()
form_result = form.to_python(dict(self.request.POST))
copy_permissions = form_result.get('repo_copy_permissions')
# create is done sometimes async on celery, db transaction
# management is handled there.
task = RepoModel().create(form_result, self._rhodecode_user.user_id)
task_id = get_task_id(task)
except formencode.Invalid as errors:
data = render('rhodecode:templates/admin/repos/repo_add.mako',
self._get_template_context(c), self.request)
html = formencode.htmlfill.render(
data,
defaults=errors.value,
errors=errors.unpack_errors() or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
return Response(html)
except Exception as e:
msg = self._log_creation_exception(e, form_result.get('repo_name'))
h.flash(msg, category='error')
raise HTTPFound(h.route_path('home'))
repo_name = form_result.get('repo_name_full')
PermissionModel().trigger_permission_flush()
raise HTTPFound(
h.route_path('repo_creating', repo_name=repo_name,
_query=dict(task_id=task_id)))