##// END OF EJS Templates
release: Finish preparation for 4.16.0
release: Finish preparation for 4.16.0

File last commit:

r3363:f08e98b1 default
r3500:79774464 v4.16.0 stable
Show More
repo_schema.py
430 lines | 14.8 KiB | text/x-python | PythonLexer
gists: use colander schema to validate input data....
r523 # -*- coding: utf-8 -*-
docs: updated copyrights to 2019
r3363 # Copyright (C) 2016-2019 RhodeCode GmbH
gists: use colander schema to validate input data....
r523 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import colander
repo-schema: made changes to allow deform usage.
r1718 import deform.widget
gists: use colander schema to validate input data....
r523
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 from rhodecode.translation import _
repo-schema: made changes to allow deform usage.
r1718 from rhodecode.model.validation_schema.utils import convert_to_optgroup
gists: use colander schema to validate input data....
r523 from rhodecode.model.validation_schema import validators, preparers, types
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 DEFAULT_LANDING_REF = 'rev:tip'
def get_group_and_repo(repo_name):
from rhodecode.model.repo_group import RepoGroupModel
return RepoGroupModel()._get_group_name_and_parent(
repo_name, get_object=True)
repo-schema: made changes to allow deform usage.
r1718 def get_repo_group(repo_group_id):
from rhodecode.model.repo_group import RepoGroup
return RepoGroup.get(repo_group_id), RepoGroup.CHOICES_SEPARATOR
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 @colander.deferred
def deferred_repo_type_validator(node, kw):
options = kw.get('repo_type_options', [])
return colander.OneOf([x for x in options])
@colander.deferred
def deferred_repo_owner_validator(node, kw):
def repo_owner_validator(node, value):
from rhodecode.model.db import User
existing = User.get_by_username(value)
if not existing:
msg = _(u'Repo owner with id `{}` does not exists').format(value)
raise colander.Invalid(node, msg)
return repo_owner_validator
@colander.deferred
def deferred_landing_ref_validator(node, kw):
repo-schema: made changes to allow deform usage.
r1718 options = kw.get(
'repo_ref_options', [DEFAULT_LANDING_REF])
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 return colander.OneOf([x for x in options])
@colander.deferred
repositories: allow properly updating repository push url.
r2562 def deferred_sync_uri_validator(node, kw):
repo-schema: added a custom schema for repo settings....
r1719 repo_type = kw.get('repo_type')
validator = validators.CloneUriValidator(repo_type)
return validator
@colander.deferred
repo-schema: made changes to allow deform usage.
r1718 def deferred_landing_ref_widget(node, kw):
items = kw.get(
'repo_ref_items', [(DEFAULT_LANDING_REF, DEFAULT_LANDING_REF)])
items = convert_to_optgroup(items)
return deform.widget.Select2Widget(values=items)
@colander.deferred
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 def deferred_fork_of_validator(node, kw):
old_values = kw.get('old_values') or {}
def fork_of_validator(node, value):
from rhodecode.model.db import Repository, RepoGroup
existing = Repository.get_by_repo_name(value)
if not existing:
msg = _(u'Fork with id `{}` does not exists').format(value)
raise colander.Invalid(node, msg)
elif old_values['repo_name'] == existing.repo_name:
msg = _(u'Cannot set fork of '
u'parameter of this repository to itself').format(value)
raise colander.Invalid(node, msg)
return fork_of_validator
@colander.deferred
def deferred_can_write_to_group_validator(node, kw):
request_user = kw.get('user')
old_values = kw.get('old_values') or {}
def can_write_to_group_validator(node, value):
"""
Checks if given repo path is writable by user. This includes checks if
user is allowed to create repositories under root path or under
repo group paths
"""
from rhodecode.lib.auth import (
HasPermissionAny, HasRepoGroupPermissionAny)
from rhodecode.model.repo_group import RepoGroupModel
messages = {
'invalid_repo_group':
_(u"Repository group `{}` does not exist"),
# permissions denied we expose as not existing, to prevent
# resource discovery
'permission_denied':
_(u"Repository group `{}` does not exist"),
'permission_denied_root':
_(u"You do not have the permission to store "
u"repositories in the root location.")
}
value = value['repo_group_name']
is_root_location = value is types.RootLocation
# NOT initialized validators, we must call them
can_create_repos_at_root = HasPermissionAny(
'hg.admin', 'hg.create.repository')
# if values is root location, we simply need to check if we can write
# to root location !
if is_root_location:
if can_create_repos_at_root(user=request_user):
# we can create repo group inside tool-level. No more checks
# are required
return
else:
# "fake" node name as repo_name, otherwise we oddly report
# the error as if it was coming form repo_group
# however repo_group is empty when using root location.
node.name = 'repo_name'
raise colander.Invalid(node, messages['permission_denied_root'])
# parent group not exists ? throw an error
repo_group = RepoGroupModel().get_by_group_name(value)
if value and not repo_group:
raise colander.Invalid(
node, messages['invalid_repo_group'].format(value))
gr_name = repo_group.group_name
# create repositories with write permission on group is set to true
create_on_write = HasPermissionAny(
'hg.create.write_on_repogroup.true')(user=request_user)
group_admin = HasRepoGroupPermissionAny('group.admin')(
gr_name, 'can write into group validator', user=request_user)
group_write = HasRepoGroupPermissionAny('group.write')(
gr_name, 'can write into group validator', user=request_user)
forbidden = not (group_admin or (group_write and create_on_write))
# TODO: handling of old values, and detecting no-change in path
# to skip permission checks in such cases. This only needs to be
# implemented if we use this schema in forms as well
# gid = (old_data['repo_group'].get('group_id')
# if (old_data and 'repo_group' in old_data) else None)
# value_changed = gid != safe_int(value)
# new = not old_data
# do check if we changed the value, there's a case that someone got
# revoked write permissions to a repository, he still created, we
# don't need to check permission if he didn't change the value of
# groups in form box
# if value_changed or new:
# # parent group need to be existing
# TODO: ENDS HERE
if repo_group and forbidden:
msg = messages['permission_denied'].format(value)
raise colander.Invalid(node, msg)
return can_write_to_group_validator
gists: use colander schema to validate input data....
r523
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 @colander.deferred
def deferred_unique_name_validator(node, kw):
request_user = kw.get('user')
old_values = kw.get('old_values') or {}
def unique_name_validator(node, value):
from rhodecode.model.db import Repository, RepoGroup
name_changed = value != old_values.get('repo_name')
existing = Repository.get_by_repo_name(value)
if name_changed and existing:
msg = _(u'Repository with name `{}` already exists').format(value)
raise colander.Invalid(node, msg)
existing_group = RepoGroup.get_by_group_name(value)
if name_changed and existing_group:
msg = _(u'Repository group with name `{}` already exists').format(
value)
raise colander.Invalid(node, msg)
return unique_name_validator
@colander.deferred
def deferred_repo_name_validator(node, kw):
repo-creation: validate and forbid creating .git suffixed repository names....
r1644 def no_git_suffix_validator(node, value):
if value.endswith('.git'):
msg = _('Repository name cannot end with .git')
raise colander.Invalid(node, msg)
return colander.All(
no_git_suffix_validator, validators.valid_name_validator)
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153
repo-schema: made changes to allow deform usage.
r1718 @colander.deferred
def deferred_repo_group_validator(node, kw):
options = kw.get(
'repo_repo_group_options')
return colander.OneOf([x for x in options])
@colander.deferred
def deferred_repo_group_widget(node, kw):
items = kw.get('repo_repo_group_items')
return deform.widget.Select2Widget(values=items)
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 class GroupType(colander.Mapping):
def _validate(self, node, value):
try:
return dict(repo_group_name=value)
except Exception as e:
raise colander.Invalid(
node, '"${val}" is not a mapping type: ${err}'.format(
val=value, err=e))
def deserialize(self, node, cstruct):
if cstruct is colander.null:
return cstruct
appstruct = super(GroupType, self).deserialize(node, cstruct)
validated_name = appstruct['repo_group_name']
# inject group based on once deserialized data
(repo_name_without_group,
parent_group_name,
parent_group) = get_group_and_repo(validated_name)
repo-schema: made changes to allow deform usage.
r1718 appstruct['repo_name_with_group'] = validated_name
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 appstruct['repo_name_without_group'] = repo_name_without_group
appstruct['repo_group_name'] = parent_group_name or types.RootLocation
repo-schema: made changes to allow deform usage.
r1718
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 if parent_group:
appstruct['repo_group_id'] = parent_group.group_id
return appstruct
class GroupSchema(colander.SchemaNode):
schema_type = GroupType
validator = deferred_can_write_to_group_validator
missing = colander.null
class RepoGroup(GroupSchema):
repo_group_name = colander.SchemaNode(
types.GroupNameType())
repo_group_id = colander.SchemaNode(
colander.String(), missing=None)
repo_name_without_group = colander.SchemaNode(
colander.String(), missing=None)
class RepoGroupAccessSchema(colander.MappingSchema):
repo_group = RepoGroup()
class RepoNameUniqueSchema(colander.MappingSchema):
unique_repo_name = colander.SchemaNode(
colander.String(),
validator=deferred_unique_name_validator)
class RepoSchema(colander.MappingSchema):
repo_name = colander.SchemaNode(
types.RepoNameType(),
validator=deferred_repo_name_validator)
repo_type = colander.SchemaNode(
colander.String(),
validator=deferred_repo_type_validator)
repo_owner = colander.SchemaNode(
colander.String(),
repo-schema: made changes to allow deform usage.
r1718 validator=deferred_repo_owner_validator,
widget=deform.widget.TextInputWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153
repo_description = colander.SchemaNode(
repo-schema: made changes to allow deform usage.
r1718 colander.String(), missing='',
widget=deform.widget.TextAreaWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153
repo_landing_commit_ref = colander.SchemaNode(
colander.String(),
validator=deferred_landing_ref_validator,
preparers=[preparers.strip_preparer],
repo-schema: made changes to allow deform usage.
r1718 missing=DEFAULT_LANDING_REF,
widget=deferred_landing_ref_widget)
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153
repo_clone_uri = colander.SchemaNode(
colander.String(),
api: security, fix problem when absolute paths are specified with API call, that would allow...
r2663 validator=deferred_sync_uri_validator,
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 preparers=[preparers.strip_preparer],
missing='')
repositories: allow properly updating repository push url.
r2562 repo_push_uri = colander.SchemaNode(
colander.String(),
api: security, fix problem when absolute paths are specified with API call, that would allow...
r2663 validator=deferred_sync_uri_validator,
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 preparers=[preparers.strip_preparer],
missing='')
repo_fork_of = colander.SchemaNode(
colander.String(),
validator=deferred_fork_of_validator,
missing=None)
repo_private = colander.SchemaNode(
types.StringBooleanType(),
repo-schema: made changes to allow deform usage.
r1718 missing=False, widget=deform.widget.CheckboxWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 repo_copy_permissions = colander.SchemaNode(
types.StringBooleanType(),
repo-schema: made changes to allow deform usage.
r1718 missing=False, widget=deform.widget.CheckboxWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 repo_enable_statistics = colander.SchemaNode(
types.StringBooleanType(),
repo-schema: made changes to allow deform usage.
r1718 missing=False, widget=deform.widget.CheckboxWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 repo_enable_downloads = colander.SchemaNode(
types.StringBooleanType(),
repo-schema: made changes to allow deform usage.
r1718 missing=False, widget=deform.widget.CheckboxWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153 repo_enable_locking = colander.SchemaNode(
types.StringBooleanType(),
repo-schema: made changes to allow deform usage.
r1718 missing=False, widget=deform.widget.CheckboxWidget())
repo-schemas: refactor repository schemas and use it in API update/create functions....
r1153
def deserialize(self, cstruct):
"""
Custom deserialize that allows to chain validation, and verify
permissions, and as last step uniqueness
"""
# first pass, to validate given data
appstruct = super(RepoSchema, self).deserialize(cstruct)
validated_name = appstruct['repo_name']
# second pass to validate permissions to repo_group
second = RepoGroupAccessSchema().bind(**self.bindings)
appstruct_second = second.deserialize({'repo_group': validated_name})
# save result
appstruct['repo_group'] = appstruct_second['repo_group']
# thirds to validate uniqueness
third = RepoNameUniqueSchema().bind(**self.bindings)
third.deserialize({'unique_repo_name': validated_name})
return appstruct
repo-schema: added a custom schema for repo settings....
r1719
class RepoSettingsSchema(RepoSchema):
repo_group = colander.SchemaNode(
colander.Integer(),
validator=deferred_repo_group_validator,
widget=deferred_repo_group_widget,
missing='')
repo_clone_uri_change = colander.SchemaNode(
colander.String(),
missing='NEW')
repo_clone_uri = colander.SchemaNode(
colander.String(),
preparers=[preparers.strip_preparer],
repositories: allow properly updating repository push url.
r2562 validator=deferred_sync_uri_validator,
missing='')
repo_push_uri_change = colander.SchemaNode(
colander.String(),
missing='NEW')
repo_push_uri = colander.SchemaNode(
colander.String(),
preparers=[preparers.strip_preparer],
validator=deferred_sync_uri_validator,
repo-schema: added a custom schema for repo settings....
r1719 missing='')
def deserialize(self, cstruct):
"""
Custom deserialize that allows to chain validation, and verify
permissions, and as last step uniqueness
"""
# first pass, to validate given data
appstruct = super(RepoSchema, self).deserialize(cstruct)
validated_name = appstruct['repo_name']
# because of repoSchema adds repo-group as an ID, we inject it as
# full name here because validators require it, it's unwrapped later
# so it's safe to use and final name is going to be without group anyway
group, separator = get_repo_group(appstruct['repo_group'])
if group:
validated_name = separator.join([group.group_name, validated_name])
# second pass to validate permissions to repo_group
second = RepoGroupAccessSchema().bind(**self.bindings)
appstruct_second = second.deserialize({'repo_group': validated_name})
# save result
appstruct['repo_group'] = appstruct_second['repo_group']
# thirds to validate uniqueness
third = RepoNameUniqueSchema().bind(**self.bindings)
third.deserialize({'unique_repo_name': validated_name})
return appstruct