validators.py
160 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2011-2023 RhodeCode GmbH | |||
r1719 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r821 | import re | |||
r1719 | import logging | |||
r523 | import ipaddress | |||
import colander | ||||
from rhodecode.translation import _ | ||||
r5066 | from rhodecode.lib.utils2 import glob2re | |||
from rhodecode.lib.str_utils import safe_str | ||||
r5137 | from rhodecode.lib.ext_json import json, sjson | |||
r523 | ||||
r1719 | log = logging.getLogger(__name__) | |||
r523 | ||||
def ip_addr_validator(node, value): | ||||
try: | ||||
# this raises an ValueError if address is not IpV4 or IpV6 | ||||
r5066 | ipaddress.ip_network(safe_str(value), strict=False) | |||
r523 | except ValueError: | |||
r5066 | msg = _('Please enter a valid IPv4 or IpV6 address') | |||
r523 | raise colander.Invalid(node, msg) | |||
r821 | ||||
r1149 | class IpAddrValidator(object): | |||
def __init__(self, strict=True): | ||||
self.strict = strict | ||||
def __call__(self, node, value): | ||||
try: | ||||
# this raises an ValueError if address is not IpV4 or IpV6 | ||||
r5066 | ipaddress.ip_network(safe_str(value), strict=self.strict) | |||
r1149 | except ValueError: | |||
r5066 | msg = _('Please enter a valid IPv4 or IpV6 address') | |||
r1149 | raise colander.Invalid(node, msg) | |||
r821 | def glob_validator(node, value): | |||
try: | ||||
re.compile('^' + glob2re(value) + '$') | ||||
except Exception: | ||||
r5066 | msg = _('Invalid glob pattern') | |||
r821 | raise colander.Invalid(node, msg) | |||
r1153 | ||||
def valid_name_validator(node, value): | ||||
from rhodecode.model.validation_schema import types | ||||
if value is types.RootLocation: | ||||
return | ||||
msg = _('Name must start with a letter or number. Got `{}`').format(value) | ||||
if not re.match(r'^[a-zA-z0-9]{1,}', value): | ||||
raise colander.Invalid(node, msg) | ||||
r1719 | ||||
class InvalidCloneUrl(Exception): | ||||
allowed_prefixes = () | ||||
def url_validator(url, repo_type, config): | ||||
from rhodecode.lib.vcs.backends.hg import MercurialRepository | ||||
from rhodecode.lib.vcs.backends.git import GitRepository | ||||
from rhodecode.lib.vcs.backends.svn import SubversionRepository | ||||
if repo_type == 'hg': | ||||
allowed_prefixes = ('http', 'svn+http', 'git+http') | ||||
if 'http' in url[:4]: | ||||
# initially check if it's at least the proper URL | ||||
# or does it pass basic auth | ||||
r2781 | return MercurialRepository.check_url(url, config) | |||
r1719 | elif 'svn+http' in url[:8]: # svn->hg import | |||
SubversionRepository.check_url(url, config) | ||||
elif 'git+http' in url[:8]: # git->hg import | ||||
raise NotImplementedError() | ||||
else: | ||||
exc = InvalidCloneUrl('Clone from URI %s not allowed. ' | ||||
'Allowed url must start with one of %s' | ||||
% (url, ','.join(allowed_prefixes))) | ||||
exc.allowed_prefixes = allowed_prefixes | ||||
raise exc | ||||
elif repo_type == 'git': | ||||
allowed_prefixes = ('http', 'svn+http', 'hg+http') | ||||
if 'http' in url[:4]: | ||||
# initially check if it's at least the proper URL | ||||
# or does it pass basic auth | ||||
r2781 | return GitRepository.check_url(url, config) | |||
r1719 | elif 'svn+http' in url[:8]: # svn->git import | |||
raise NotImplementedError() | ||||
elif 'hg+http' in url[:8]: # hg->git import | ||||
raise NotImplementedError() | ||||
else: | ||||
exc = InvalidCloneUrl('Clone from URI %s not allowed. ' | ||||
'Allowed url must start with one of %s' | ||||
% (url, ','.join(allowed_prefixes))) | ||||
exc.allowed_prefixes = allowed_prefixes | ||||
raise exc | ||||
r2663 | elif repo_type == 'svn': | |||
# no validation for SVN yet | ||||
return | ||||
r5095 | raise InvalidCloneUrl(f'Invalid repo type specified: `{repo_type}`') | |||
r1719 | ||||
class CloneUriValidator(object): | ||||
def __init__(self, repo_type): | ||||
self.repo_type = repo_type | ||||
def __call__(self, node, value): | ||||
r2663 | ||||
r1719 | from rhodecode.lib.utils import make_db_config | |||
try: | ||||
config = make_db_config(clear_session=False) | ||||
url_validator(value, self.repo_type, config) | ||||
except InvalidCloneUrl as e: | ||||
log.warning(e) | ||||
r5066 | raise colander.Invalid(node, str(e)) | |||
except Exception as e: | ||||
r1719 | log.exception('Url validation failed') | |||
r5066 | reason = repr(e) | |||
reason = reason.replace('<', '<').replace('>', '>') | ||||
msg = _('invalid clone url or credentials for {repo_type} repository. Reason: {reason}')\ | ||||
.format(reason=reason, repo_type=self.repo_type) | ||||
r1719 | raise colander.Invalid(node, msg) | |||
r2400 | ||||
def json_validator(node, value): | ||||
try: | ||||
json.loads(value) | ||||
r5066 | except (Exception,): | |||
msg = _('Please enter a valid json object') | ||||
r2400 | raise colander.Invalid(node, msg) | |||
r4583 | ||||
def json_validator_with_exc(node, value): | ||||
r5137 | ||||
r4583 | try: | |||
json.loads(value) | ||||
except (Exception,) as e: | ||||
r5137 | msg = _(f'Please enter a valid json object type={type(value)}: `{e}`') | |||
r4583 | raise colander.Invalid(node, msg) | |||