# Copyright (C) 2016-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import deform import colander from rhodecode.translation import _ from rhodecode.model.db import Repository, RepoGroup from rhodecode.model.validation_schema import validators, preparers def integration_scope_choices(permissions): """ Return list of (value, label) choices for integration scopes depending on the permissions """ result = [('', _('Pick a scope:'))] if 'hg.admin' in permissions['global']: result.extend([ ('global', _('Global (all repositories)')), ('root-repos', _('Top level repositories only')), ]) repo_choices = [ ('repo:%s' % repo_name, '/' + repo_name) for repo_name, repo_perm in list(permissions['repositories'].items()) if repo_perm == 'repository.admin' ] repogroup_choices = [ ('repogroup:%s' % repo_group_name, '/' + repo_group_name + '/ (child repos only)') for repo_group_name, repo_group_perm in list(permissions['repositories_groups'].items()) if repo_group_perm == 'group.admin' ] repogroup_recursive_choices = [ ('repogroup-recursive:%s' % repo_group_name, '/' + repo_group_name + '/ (recursive)') for repo_group_name, repo_group_perm in list(permissions['repositories_groups'].items()) if repo_group_perm == 'group.admin' ] result.extend( sorted(repogroup_recursive_choices + repogroup_choices + repo_choices, key=lambda choice_label: choice_label[0].split(':', 1)[1] ) ) return result @colander.deferred def deferred_integration_scopes_validator(node, kw): perms = kw.get('permissions') def _scope_validator(_node, scope): is_super_admin = 'hg.admin' in perms['global'] if scope.get('repo'): if (is_super_admin or perms['repositories'].get( scope['repo'].repo_name) == 'repository.admin'): return True msg = _('Only repo admins can create integrations') raise colander.Invalid(_node, msg) elif scope.get('repo_group'): if (is_super_admin or perms['repositories_groups'].get( scope['repo_group'].group_name) == 'group.admin'): return True msg = _('Only repogroup admins can create integrations') raise colander.Invalid(_node, msg) else: if is_super_admin: return True msg = _('Only superadmins can create global integrations') raise colander.Invalid(_node, msg) return _scope_validator @colander.deferred def deferred_integration_scopes_widget(node, kw): if kw.get('no_scope'): return deform.widget.TextInputWidget(readonly=True) choices = integration_scope_choices(kw.get('permissions')) widget = deform.widget.Select2Widget(values=choices) return widget class IntegrationScopeType(colander.SchemaType): def serialize(self, node, appstruct): if appstruct is colander.null: return colander.null if appstruct.get('repo'): return 'repo:%s' % appstruct['repo'].repo_name elif appstruct.get('repo_group'): if appstruct.get('child_repos_only'): return 'repogroup:%s' % appstruct['repo_group'].group_name else: return 'repogroup-recursive:%s' % ( appstruct['repo_group'].group_name) else: if appstruct.get('child_repos_only'): return 'root-repos' else: return 'global' def deserialize(self, node, cstruct): if cstruct is colander.null: return colander.null if cstruct.startswith('repo:'): repo = Repository.get_by_repo_name(cstruct.split(':')[1]) if repo: return { 'repo': repo, 'repo_group': None, 'child_repos_only': False, } elif cstruct.startswith('repogroup-recursive:'): repo_group = RepoGroup.get_by_group_name(cstruct.split(':')[1]) if repo_group: return { 'repo': None, 'repo_group': repo_group, 'child_repos_only': False } elif cstruct.startswith('repogroup:'): repo_group = RepoGroup.get_by_group_name(cstruct.split(':')[1]) if repo_group: return { 'repo': None, 'repo_group': repo_group, 'child_repos_only': True } elif cstruct == 'global': return { 'repo': None, 'repo_group': None, 'child_repos_only': False } elif cstruct == 'root-repos': return { 'repo': None, 'repo_group': None, 'child_repos_only': True } raise colander.Invalid(node, '%r is not a valid scope' % cstruct) class IntegrationOptionsSchemaBase(colander.MappingSchema): name = colander.SchemaNode( colander.String(), description=_('Short name for this integration.'), missing=colander.required, title=_('Integration name'), ) scope = colander.SchemaNode( IntegrationScopeType(), description=_( 'Scope of the integration. Recursive means the integration ' ' runs on all repos of that group and children recursively.'), title=_('Integration scope'), validator=deferred_integration_scopes_validator, widget=deferred_integration_scopes_widget, missing=colander.required, ) enabled = colander.SchemaNode( colander.Bool(), default=True, description=_('Enable or disable this integration.'), missing=False, title=_('Enabled'), ) def make_integration_schema(IntegrationType, settings=None): """ Return a colander schema for an integration type :param IntegrationType: the integration type class :param settings: existing integration settings dict (optional) """ settings = settings or {} settings_schema = IntegrationType(settings=settings).settings_schema() class IntegrationSchema(colander.Schema): options = IntegrationOptionsSchemaBase() schema = IntegrationSchema() schema['options'].title = _('General integration options') settings_schema.name = 'settings' settings_schema.title = _('{integration_type} settings').format( integration_type=IntegrationType.display_name) schema.add(settings_schema) return schema