integration_schema.py
221 lines
| 7.4 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2016-2024 RhodeCode GmbH | |||
r731 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import deform | ||||
import colander | ||||
from rhodecode.translation import _ | ||||
from rhodecode.model.db import Repository, RepoGroup | ||||
from rhodecode.model.validation_schema import validators, preparers | ||||
def integration_scope_choices(permissions): | ||||
""" | ||||
Return list of (value, label) choices for integration scopes depending on | ||||
the permissions | ||||
""" | ||||
result = [('', _('Pick a scope:'))] | ||||
if 'hg.admin' in permissions['global']: | ||||
result.extend([ | ||||
('global', _('Global (all repositories)')), | ||||
r793 | ('root-repos', _('Top level repositories only')), | |||
r731 | ]) | |||
repo_choices = [ | ||||
('repo:%s' % repo_name, '/' + repo_name) | ||||
for repo_name, repo_perm | ||||
r4964 | in list(permissions['repositories'].items()) | |||
r731 | if repo_perm == 'repository.admin' | |||
] | ||||
repogroup_choices = [ | ||||
r793 | ('repogroup:%s' % repo_group_name, '/' + repo_group_name + '/ (child repos only)') | |||
for repo_group_name, repo_group_perm | ||||
r4964 | in list(permissions['repositories_groups'].items()) | |||
r793 | if repo_group_perm == 'group.admin' | |||
] | ||||
repogroup_recursive_choices = [ | ||||
('repogroup-recursive:%s' % repo_group_name, '/' + repo_group_name + '/ (recursive)') | ||||
r731 | for repo_group_name, repo_group_perm | |||
r4964 | in list(permissions['repositories_groups'].items()) | |||
r731 | if repo_group_perm == 'group.admin' | |||
] | ||||
result.extend( | ||||
r793 | sorted(repogroup_recursive_choices + repogroup_choices + repo_choices, | |||
r4964 | key=lambda choice_label: choice_label[0].split(':', 1)[1] | |||
r731 | ) | |||
) | ||||
return result | ||||
@colander.deferred | ||||
def deferred_integration_scopes_validator(node, kw): | ||||
perms = kw.get('permissions') | ||||
def _scope_validator(_node, scope): | ||||
is_super_admin = 'hg.admin' in perms['global'] | ||||
r793 | if scope.get('repo'): | |||
if (is_super_admin or perms['repositories'].get( | ||||
scope['repo'].repo_name) == 'repository.admin'): | ||||
return True | ||||
msg = _('Only repo admins can create integrations') | ||||
raise colander.Invalid(_node, msg) | ||||
elif scope.get('repo_group'): | ||||
if (is_super_admin or perms['repositories_groups'].get( | ||||
scope['repo_group'].group_name) == 'group.admin'): | ||||
return True | ||||
msg = _('Only repogroup admins can create integrations') | ||||
raise colander.Invalid(_node, msg) | ||||
else: | ||||
r731 | if is_super_admin: | |||
return True | ||||
msg = _('Only superadmins can create global integrations') | ||||
raise colander.Invalid(_node, msg) | ||||
return _scope_validator | ||||
@colander.deferred | ||||
def deferred_integration_scopes_widget(node, kw): | ||||
if kw.get('no_scope'): | ||||
return deform.widget.TextInputWidget(readonly=True) | ||||
choices = integration_scope_choices(kw.get('permissions')) | ||||
widget = deform.widget.Select2Widget(values=choices) | ||||
return widget | ||||
r793 | ||||
class IntegrationScopeType(colander.SchemaType): | ||||
r731 | def serialize(self, node, appstruct): | |||
if appstruct is colander.null: | ||||
return colander.null | ||||
r793 | if appstruct.get('repo'): | |||
return 'repo:%s' % appstruct['repo'].repo_name | ||||
elif appstruct.get('repo_group'): | ||||
if appstruct.get('child_repos_only'): | ||||
return 'repogroup:%s' % appstruct['repo_group'].group_name | ||||
else: | ||||
return 'repogroup-recursive:%s' % ( | ||||
appstruct['repo_group'].group_name) | ||||
else: | ||||
if appstruct.get('child_repos_only'): | ||||
return 'root-repos' | ||||
else: | ||||
return 'global' | ||||
r731 | def deserialize(self, node, cstruct): | |||
if cstruct is colander.null: | ||||
return colander.null | ||||
if cstruct.startswith('repo:'): | ||||
repo = Repository.get_by_repo_name(cstruct.split(':')[1]) | ||||
if repo: | ||||
r793 | return { | |||
'repo': repo, | ||||
'repo_group': None, | ||||
r1384 | 'child_repos_only': False, | |||
r793 | } | |||
elif cstruct.startswith('repogroup-recursive:'): | ||||
repo_group = RepoGroup.get_by_group_name(cstruct.split(':')[1]) | ||||
if repo_group: | ||||
return { | ||||
'repo': None, | ||||
'repo_group': repo_group, | ||||
'child_repos_only': False | ||||
} | ||||
r731 | elif cstruct.startswith('repogroup:'): | |||
repo_group = RepoGroup.get_by_group_name(cstruct.split(':')[1]) | ||||
if repo_group: | ||||
r793 | return { | |||
'repo': None, | ||||
'repo_group': repo_group, | ||||
'child_repos_only': True | ||||
} | ||||
elif cstruct == 'global': | ||||
return { | ||||
'repo': None, | ||||
'repo_group': None, | ||||
'child_repos_only': False | ||||
} | ||||
elif cstruct == 'root-repos': | ||||
return { | ||||
'repo': None, | ||||
'repo_group': None, | ||||
'child_repos_only': True | ||||
} | ||||
r731 | ||||
raise colander.Invalid(node, '%r is not a valid scope' % cstruct) | ||||
r793 | ||||
r731 | class IntegrationOptionsSchemaBase(colander.MappingSchema): | |||
name = colander.SchemaNode( | ||||
colander.String(), | ||||
description=_('Short name for this integration.'), | ||||
missing=colander.required, | ||||
title=_('Integration name'), | ||||
) | ||||
scope = colander.SchemaNode( | ||||
r793 | IntegrationScopeType(), | |||
r731 | description=_( | |||
r793 | 'Scope of the integration. Recursive means the integration ' | |||
' runs on all repos of that group and children recursively.'), | ||||
r731 | title=_('Integration scope'), | |||
validator=deferred_integration_scopes_validator, | ||||
widget=deferred_integration_scopes_widget, | ||||
missing=colander.required, | ||||
) | ||||
enabled = colander.SchemaNode( | ||||
colander.Bool(), | ||||
default=True, | ||||
description=_('Enable or disable this integration.'), | ||||
missing=False, | ||||
title=_('Enabled'), | ||||
) | ||||
def make_integration_schema(IntegrationType, settings=None): | ||||
""" | ||||
Return a colander schema for an integration type | ||||
:param IntegrationType: the integration type class | ||||
:param settings: existing integration settings dict (optional) | ||||
""" | ||||
settings = settings or {} | ||||
settings_schema = IntegrationType(settings=settings).settings_schema() | ||||
class IntegrationSchema(colander.Schema): | ||||
options = IntegrationOptionsSchemaBase() | ||||
schema = IntegrationSchema() | ||||
schema['options'].title = _('General integration options') | ||||
settings_schema.name = 'settings' | ||||
settings_schema.title = _('{integration_type} settings').format( | ||||
integration_type=IntegrationType.display_name) | ||||
schema.add(settings_schema) | ||||
return schema | ||||