validators.py
804 lines
| 30.1 KiB
| text/x-python
|
PythonLexer
Bradley M. Kuhn
|
r4187 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
Set of generic validators | ||||
""" | ||||
Mads Kiilerich
|
r7718 | import logging | ||
Bradley M. Kuhn
|
r4187 | import os | ||
import re | ||||
from collections import defaultdict | ||||
Mads Kiilerich
|
r7716 | |||
Mads Kiilerich
|
r7718 | import formencode | ||
import ipaddr | ||||
import sqlalchemy | ||||
from formencode.validators import CIDR, Bool, Email, FancyValidator, Int, IPAddress, NotEmpty, Number, OneOf, Regex, Set, String, StringBoolean, UnicodeString | ||||
domruf
|
r6561 | from sqlalchemy import func | ||
Mads Kiilerich
|
r7718 | from tg.i18n import ugettext as _ | ||
Bradley M. Kuhn
|
r4187 | |||
Mads Kiilerich
|
r7718 | from kallithea.config.routing import ADMIN_PREFIX | ||
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel | ||||
Bradley M. Kuhn
|
r4187 | from kallithea.lib.compat import OrderedSet | ||
Mads Kiilerich
|
r7718 | from kallithea.lib.exceptions import LdapImportError | ||
Thomas De Schampheleire
|
r7251 | from kallithea.lib.utils import is_valid_repo_uri | ||
Mads Kiilerich
|
r7718 | from kallithea.lib.utils2 import aslist, repo_name_slug, str2bool | ||
from kallithea.model.db import RepoGroup, Repository, User, UserGroup | ||||
Bradley M. Kuhn
|
r4187 | |||
# silence warnings and pylint | ||||
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \ | ||||
NotEmpty, IPAddress, CIDR, String, FancyValidator | ||||
log = logging.getLogger(__name__) | ||||
Mads Kiilerich
|
r4436 | def UniqueListFromString(): | ||
class _UniqueListFromString(formencode.FancyValidator): | ||||
Bradley M. Kuhn
|
r4187 | """ | ||
Mads Kiilerich
|
r4436 | Split value on ',' and make unique while preserving order | ||
Bradley M. Kuhn
|
r4187 | """ | ||
messages = dict( | ||||
empty=_('Value cannot be an empty list'), | ||||
missing_value=_('Value cannot be an empty list'), | ||||
) | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Mads Kiilerich
|
r4436 | value = aslist(value, ',') | ||
seen = set() | ||||
return [c for c in value if not (c in seen or seen.add(c))] | ||||
Mads Kiilerich
|
r4713 | |||
Bradley M. Kuhn
|
r4187 | def empty_value(self, value): | ||
return [] | ||||
return _UniqueListFromString | ||||
Jiřà Suchan
|
r5555 | def ValidUsername(edit=False, old_data=None): | ||
old_data = old_data or {} | ||||
Lars Kruse
|
r6789 | |||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'username_exists': _('Username "%(username)s" already exists'), | ||
Bradley M. Kuhn
|
r4187 | 'system_invalid_username': | ||
Mads Kiilerich
|
r5296 | _('Username "%(username)s" cannot be used'), | ||
Bradley M. Kuhn
|
r4187 | 'invalid_username': | ||
Mads Kiilerich
|
r5295 | _('Username may only contain alphanumeric characters ' | ||
Mads Kiilerich
|
r5296 | 'underscores, periods or dashes and must begin with an ' | ||
'alphanumeric character or underscore') | ||||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if value in ['default', 'new_user']: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('system_invalid_username', state, username=value) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state) | ||
Lars Kruse
|
r6789 | # check if user is unique | ||
Bradley M. Kuhn
|
r4187 | old_un = None | ||
if edit: | ||||
old_un = User.get(old_data.get('user_id')).username | ||||
if old_un != value or not edit: | ||||
if User.get_by_username(value, case_insensitive=True): | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('username_exists', state, username=value) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state) | ||
if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None: | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_username', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state) | ||
return _validator | ||||
def ValidRegex(msg=None): | ||||
class _validator(formencode.validators.Regex): | ||||
messages = dict(invalid=msg or _('The input is not valid')) | ||||
return _validator | ||||
def ValidRepoUser(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'invalid_username': _('Username %(username)s is not valid') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | try: | ||
Mads Kiilerich
|
r5585 | User.query().filter(User.active == True) \ | ||
Bradley M. Kuhn
|
r4187 | .filter(User.username == value).one() | ||
Mads Kiilerich
|
r4733 | except sqlalchemy.exc.InvalidRequestError: # NoResultFound/MultipleResultsFound | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_username', state, username=value) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(username=msg) | ||||
) | ||||
return _validator | ||||
Jiřà Suchan
|
r5555 | def ValidUserGroup(edit=False, old_data=None): | ||
old_data = old_data or {} | ||||
Lars Kruse
|
r6789 | |||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'invalid_group': _('Invalid user group name'), | ||
'group_exist': _('User group "%(usergroup)s" already exists'), | ||||
Bradley M. Kuhn
|
r4187 | 'invalid_usergroup_name': | ||
Mads Kiilerich
|
r5295 | _('user group name may only contain alphanumeric ' | ||
Bradley M. Kuhn
|
r4187 | 'characters underscores, periods or dashes and must begin ' | ||
'with alphanumeric character') | ||||
} | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if value in ['default']: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_group', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(users_group_name=msg) | ||||
) | ||||
Lars Kruse
|
r6789 | # check if group is unique | ||
Bradley M. Kuhn
|
r4187 | old_ugname = None | ||
if edit: | ||||
old_id = old_data.get('users_group_id') | ||||
old_ugname = UserGroup.get(old_id).users_group_name | ||||
if old_ugname != value or not edit: | ||||
is_existing_group = UserGroup.get_by_group_name(value, | ||||
case_insensitive=True) | ||||
if is_existing_group: | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('group_exist', state, usergroup=value) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(users_group_name=msg) | ||||
) | ||||
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None: | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_usergroup_name', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(users_group_name=msg) | ||||
) | ||||
return _validator | ||||
Jiřà Suchan
|
r5555 | def ValidRepoGroup(edit=False, old_data=None): | ||
old_data = old_data or {} | ||||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
Søren Løvborg
|
r6280 | 'parent_group_id': _('Cannot assign this group as parent'), | ||
Mads Kiilerich
|
r5295 | 'group_exists': _('Group "%(group_name)s" already exists'), | ||
Bradley M. Kuhn
|
r4187 | 'repo_exists': | ||
Mads Kiilerich
|
r5295 | _('Repository with name "%(group_name)s" already exists') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | # TODO WRITE VALIDATIONS | ||
group_name = value.get('group_name') | ||||
Søren Løvborg
|
r6280 | parent_group_id = value.get('parent_group_id') | ||
Bradley M. Kuhn
|
r4187 | |||
# slugify repo group just in case :) | ||||
slug = repo_name_slug(group_name) | ||||
# check for parent of self | ||||
parent_of_self = lambda: ( | ||||
Søren Løvborg
|
r6280 | old_data['group_id'] == parent_group_id | ||
if parent_group_id else False | ||||
Bradley M. Kuhn
|
r4187 | ) | ||
if edit and parent_of_self(): | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('parent_group_id', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
Søren Løvborg
|
r6280 | error_dict=dict(parent_group_id=msg) | ||
Bradley M. Kuhn
|
r4187 | ) | ||
old_gname = None | ||||
if edit: | ||||
old_gname = RepoGroup.get(old_data.get('group_id')).group_name | ||||
if old_gname != group_name or not edit: | ||||
# check group | ||||
Mads Kiilerich
|
r5585 | gr = RepoGroup.query() \ | ||
domruf
|
r6561 | .filter(func.lower(RepoGroup.group_name) == func.lower(slug)) \ | ||
Søren Løvborg
|
r6280 | .filter(RepoGroup.parent_group_id == parent_group_id) \ | ||
Bradley M. Kuhn
|
r4187 | .scalar() | ||
Mads Kiilerich
|
r5306 | if gr is not None: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('group_exists', state, group_name=slug) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(group_name=msg) | ||||
) | ||||
# check for same repo | ||||
Mads Kiilerich
|
r5585 | repo = Repository.query() \ | ||
domruf
|
r6561 | .filter(func.lower(Repository.repo_name) == func.lower(slug)) \ | ||
Bradley M. Kuhn
|
r4187 | .scalar() | ||
Mads Kiilerich
|
r5306 | if repo is not None: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('repo_exists', state, group_name=slug) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(group_name=msg) | ||||
) | ||||
return _validator | ||||
def ValidPassword(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_password': | ||||
Mads Kiilerich
|
r5295 | _('Invalid characters (non-ascii) in password') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | try: | ||
Mads Kiilerich
|
r7959 | (value or '').encode('ascii') | ||
Bradley M. Kuhn
|
r4187 | except UnicodeError: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_password', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state,) | ||
return _validator | ||||
def ValidOldPassword(username): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'invalid_password': _('Invalid old password') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | from kallithea.lib import auth_modules | ||
Mads Kiilerich
|
r5337 | if auth_modules.authenticate(username, value, '') is None: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_password', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(current_password=msg) | ||||
) | ||||
return _validator | ||||
Mads Kiilerich
|
r5336 | def ValidPasswordsMatch(password_field, password_confirmation_field): | ||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'password_mismatch': _('Passwords do not match'), | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Mads Kiilerich
|
r5336 | if value.get(password_field) != value[password_confirmation_field]: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('password_mismatch', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
Mads Kiilerich
|
r7723 | error_dict={password_field: msg, password_confirmation_field: msg} | ||
Bradley M. Kuhn
|
r4187 | ) | ||
return _validator | ||||
def ValidAuth(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Andrew Shadura
|
r5346 | 'invalid_auth': _(u'Invalid username or password'), | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | from kallithea.lib import auth_modules | ||
password = value['password'] | ||||
username = value['username'] | ||||
Mads Kiilerich
|
r5337 | # authenticate returns unused dict but has called | ||
# plugin._authenticate which has create_or_update'ed the username user in db | ||||
if auth_modules.authenticate(username, password) is None: | ||||
Andrew Shadura
|
r5671 | user = User.get_by_username_or_email(username) | ||
Bradley M. Kuhn
|
r4187 | if user and not user.active: | ||
Mads Kiilerich
|
r5375 | log.warning('user %s is disabled', username) | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_auth', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
Andrew Shadura
|
r5346 | error_dict=dict(username=' ', password=msg) | ||
Bradley M. Kuhn
|
r4187 | ) | ||
else: | ||||
Mads Kiilerich
|
r5375 | log.warning('user %s failed to authenticate', username) | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_auth', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
Andrew Shadura
|
r5346 | error_dict=dict(username=' ', password=msg) | ||
Bradley M. Kuhn
|
r4187 | ) | ||
return _validator | ||||
Jiřà Suchan
|
r5555 | def ValidRepoName(edit=False, old_data=None): | ||
old_data = old_data or {} | ||||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
'invalid_repo_name': | ||||
Mads Kiilerich
|
r5296 | _('Repository name %(repo)s is not allowed'), | ||
Bradley M. Kuhn
|
r4187 | 'repository_exists': | ||
Mads Kiilerich
|
r5295 | _('Repository named %(repo)s already exists'), | ||
'repository_in_group_exists': _('Repository "%(repo)s" already ' | ||||
Bradley M. Kuhn
|
r4187 | 'exists in group "%(group)s"'), | ||
Mads Kiilerich
|
r5295 | 'same_group_exists': _('Repository group with name "%(repo)s" ' | ||
Bradley M. Kuhn
|
r4187 | 'already exists') | ||
} | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | repo_name = repo_name_slug(value.get('repo_name', '')) | ||
repo_group = value.get('repo_group') | ||||
if repo_group: | ||||
gr = RepoGroup.get(repo_group) | ||||
group_path = gr.full_path | ||||
group_name = gr.group_name | ||||
# value needs to be aware of group name in order to check | ||||
# db key This is an actual just the name to store in the | ||||
# database | ||||
repo_name_full = group_path + RepoGroup.url_sep() + repo_name | ||||
else: | ||||
group_name = group_path = '' | ||||
repo_name_full = repo_name | ||||
value['repo_name'] = repo_name | ||||
value['repo_name_full'] = repo_name_full | ||||
value['group_path'] = group_path | ||||
value['group_name'] = group_name | ||||
return value | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | repo_name = value.get('repo_name') | ||
repo_name_full = value.get('repo_name_full') | ||||
group_path = value.get('group_path') | ||||
group_name = value.get('group_name') | ||||
if repo_name in [ADMIN_PREFIX, '']: | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_repo_name', state, repo=repo_name) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(repo_name=msg) | ||||
) | ||||
rename = old_data.get('repo_name') != repo_name_full | ||||
create = not edit | ||||
if rename or create: | ||||
domruf
|
r6561 | repo = Repository.get_by_repo_name(repo_name_full, case_insensitive=True) | ||
repo_group = RepoGroup.get_by_group_name(repo_name_full, case_insensitive=True) | ||||
Bradley M. Kuhn
|
r4187 | if group_path != '': | ||
domruf
|
r6561 | if repo is not None: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('repository_in_group_exists', state, | ||
domruf
|
r6561 | repo=repo.repo_name, group=group_name) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(repo_name=msg) | ||||
) | ||||
domruf
|
r6561 | elif repo_group is not None: | ||
Mads Kiilerich
|
r7731 | msg = self.message('same_group_exists', state, | ||
repo=repo_name) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_name=msg) | ||||
) | ||||
domruf
|
r6561 | elif repo is not None: | ||
Mads Kiilerich
|
r7731 | msg = self.message('repository_exists', state, | ||
repo=repo.repo_name) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_name=msg) | ||||
) | ||||
Bradley M. Kuhn
|
r4187 | return value | ||
return _validator | ||||
def ValidForkName(*args, **kwargs): | ||||
return ValidRepoName(*args, **kwargs) | ||||
def SlugifyName(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | return repo_name_slug(value) | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | pass | ||
return _validator | ||||
def ValidCloneUri(): | ||||
from kallithea.lib.utils import make_ui | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'clone_uri': _('Invalid repository URL'), | ||
'invalid_clone_uri': _('Invalid repository URL. It must be a ' | ||||
Mads Kiilerich
|
r5296 | 'valid http, https, ssh, svn+http or svn+https URL'), | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | repo_type = value.get('repo_type') | ||
url = value.get('clone_uri') | ||||
Mads Kiilerich
|
r5285 | if url and url != value.get('clone_uri_hidden'): | ||
Bradley M. Kuhn
|
r4187 | try: | ||
Mads Kiilerich
|
r7878 | is_valid_repo_uri(repo_type, url, make_ui()) | ||
Bradley M. Kuhn
|
r4187 | except Exception: | ||
Thomas De Schampheleire
|
r4822 | log.exception('URL validation failed') | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('clone_uri', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(clone_uri=msg) | ||||
) | ||||
return _validator | ||||
Jiřà Suchan
|
r5555 | def ValidForkType(old_data=None): | ||
old_data = old_data or {} | ||||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'invalid_fork_type': _('Fork has to be the same type as parent') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if old_data['repo_type'] != value: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_fork_type', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(repo_type=msg) | ||||
) | ||||
return _validator | ||||
def CanWriteGroup(old_data=None): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'permission_denied': _("You don't have permissions " | ||
Bradley M. Kuhn
|
r4187 | "to create repository in this group"), | ||
Mads Kiilerich
|
r5295 | 'permission_denied_root': _("no permission to create repository " | ||
Bradley M. Kuhn
|
r4187 | "in root location") | ||
} | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Lars Kruse
|
r6789 | # root location | ||
Mads Kiilerich
|
r5281 | if value == -1: | ||
Bradley M. Kuhn
|
r4187 | return None | ||
return value | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | gr = RepoGroup.get(value) | ||
Mads Kiilerich
|
r5281 | gr_name = gr.group_name if gr is not None else None # None means ROOT location | ||
Bradley M. Kuhn
|
r4187 | # create repositories with write permission on group is set to true | ||
create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')() | ||||
Søren Løvborg
|
r6472 | group_admin = HasRepoGroupPermissionLevel('admin')(gr_name, | ||
Bradley M. Kuhn
|
r4187 | 'can write into group validator') | ||
Søren Løvborg
|
r6472 | group_write = HasRepoGroupPermissionLevel('write')(gr_name, | ||
Bradley M. Kuhn
|
r4187 | 'can write into group validator') | ||
forbidden = not (group_admin or (group_write and create_on_write)) | ||||
can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository') | ||||
gid = (old_data['repo_group'].get('group_id') | ||||
if (old_data and 'repo_group' in old_data) else None) | ||||
Mads Kiilerich
|
r5281 | value_changed = gid != value | ||
Bradley M. Kuhn
|
r4187 | new = not old_data | ||
# do check if we changed the value, there's a case that someone got | ||||
# revoked write permissions to a repository, he still created, we | ||||
# don't need to check permission if he didn't change the value of | ||||
# groups in form box | ||||
if value_changed or new: | ||||
Lars Kruse
|
r6789 | # parent group need to be existing | ||
Bradley M. Kuhn
|
r4187 | if gr and forbidden: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('permission_denied', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(repo_type=msg) | ||||
) | ||||
## check if we can write to root location ! | ||||
elif gr is None and not can_create_repos(): | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('permission_denied_root', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(repo_type=msg) | ||||
) | ||||
return _validator | ||||
def CanCreateGroup(can_create_in_root=False): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'permission_denied': _("You don't have permissions " | ||
Bradley M. Kuhn
|
r4187 | "to create a group in this location") | ||
} | ||||
def to_python(self, value, state): | ||||
Lars Kruse
|
r6789 | # root location | ||
Mads Kiilerich
|
r5281 | if value == -1: | ||
Bradley M. Kuhn
|
r4187 | return None | ||
return value | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | gr = RepoGroup.get(value) | ||
Mads Kiilerich
|
r5281 | gr_name = gr.group_name if gr is not None else None # None means ROOT location | ||
Bradley M. Kuhn
|
r4187 | |||
if can_create_in_root and gr is None: | ||||
Lars Kruse
|
r6789 | # we can create in root, we're fine no validations required | ||
Bradley M. Kuhn
|
r4187 | return | ||
forbidden_in_root = gr is None and not can_create_in_root | ||||
Søren Løvborg
|
r6472 | forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator') | ||
Bradley M. Kuhn
|
r4187 | if forbidden_in_root or forbidden: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('permission_denied', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
Søren Løvborg
|
r6280 | error_dict=dict(parent_group_id=msg) | ||
Bradley M. Kuhn
|
r4187 | ) | ||
return _validator | ||||
def ValidPerms(type_='repo'): | ||||
if type_ == 'repo_group': | ||||
EMPTY_PERM = 'group.none' | ||||
elif type_ == 'repo': | ||||
EMPTY_PERM = 'repository.none' | ||||
elif type_ == 'user_group': | ||||
EMPTY_PERM = 'usergroup.none' | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'perm_new_member_name': | ||||
Mads Kiilerich
|
r5295 | _('This username or user group name is not valid') | ||
Bradley M. Kuhn
|
r4187 | } | ||
def to_python(self, value, state): | ||||
perms_update = OrderedSet() | ||||
perms_new = OrderedSet() | ||||
# build a list of permission to update and new permission to create | ||||
Lars Kruse
|
r6789 | # CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using | ||
Bradley M. Kuhn
|
r4187 | new_perms_group = defaultdict(dict) | ||
Mads Kiilerich
|
r8059 | for k, v in value.copy().items(): | ||
Bradley M. Kuhn
|
r4187 | if k.startswith('perm_new_member'): | ||
del value[k] | ||||
_type, part = k.split('perm_new_member_') | ||||
args = part.split('_') | ||||
if len(args) == 1: | ||||
new_perms_group[args[0]]['perm'] = v | ||||
elif len(args) == 2: | ||||
_key, pos = args | ||||
new_perms_group[pos][_key] = v | ||||
# fill new permissions in order of how they were added | ||||
Mads Kiilerich
|
r7906 | for k in sorted(new_perms_group, key=lambda k: int(k)): | ||
perm_dict = new_perms_group[k] | ||||
Bradley M. Kuhn
|
r4187 | new_member = perm_dict.get('name') | ||
new_perm = perm_dict.get('perm') | ||||
new_type = perm_dict.get('type') | ||||
if new_member and new_perm and new_type: | ||||
perms_new.add((new_member, new_perm, new_type)) | ||||
Mads Kiilerich
|
r8059 | for k, v in value.items(): | ||
Bradley M. Kuhn
|
r4187 | if k.startswith('u_perm_') or k.startswith('g_perm_'): | ||
member = k[7:] | ||||
t = {'u': 'user', | ||||
'g': 'users_group' | ||||
}[k[0]] | ||||
if member == User.DEFAULT_USER: | ||||
if str2bool(value.get('repo_private')): | ||||
# set none for default when updating to | ||||
Thomas De Schampheleire
|
r4919 | # private repo protects against form manipulation | ||
Bradley M. Kuhn
|
r4187 | v = EMPTY_PERM | ||
perms_update.add((member, v, t)) | ||||
value['perms_updates'] = list(perms_update) | ||||
value['perms_new'] = list(perms_new) | ||||
# update permissions | ||||
for k, v, t in perms_new: | ||||
try: | ||||
Mads Kiilerich
|
r7721 | if t == 'user': | ||
Mads Kiilerich
|
r5585 | self.user_db = User.query() \ | ||
.filter(User.active == True) \ | ||||
Bradley M. Kuhn
|
r4187 | .filter(User.username == k).one() | ||
Mads Kiilerich
|
r7721 | if t == 'users_group': | ||
Mads Kiilerich
|
r5585 | self.user_db = UserGroup.query() \ | ||
.filter(UserGroup.users_group_active == True) \ | ||||
Bradley M. Kuhn
|
r4187 | .filter(UserGroup.users_group_name == k).one() | ||
except Exception: | ||||
log.exception('Updated permission failed') | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('perm_new_member_type', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(perm_new_member_name=msg) | ||||
) | ||||
return value | ||||
return _validator | ||||
def ValidSettings(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | # settings form for users that are not admin | ||
# can't edit certain parameters, it's extra backup if they mangle | ||||
# with forms | ||||
forbidden_params = [ | ||||
Mads Kiilerich
|
r7578 | 'user', 'repo_type', | ||
Bradley M. Kuhn
|
r4187 | 'repo_enable_downloads', 'repo_enable_statistics' | ||
] | ||||
for param in forbidden_params: | ||||
if param in value: | ||||
del value[param] | ||||
return value | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | pass | ||
return _validator | ||||
def ValidPath(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Mads Kiilerich
|
r5295 | 'invalid_path': _('This is not a valid path') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if not os.path.isdir(value): | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('invalid_path', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(paths_root_path=msg) | ||||
) | ||||
return _validator | ||||
Jiřà Suchan
|
r5555 | def UniqSystemEmail(old_data=None): | ||
old_data = old_data or {} | ||||
Bradley M. Kuhn
|
r4187 | class _validator(formencode.validators.FancyValidator): | ||
messages = { | ||||
Søren Løvborg
|
r5412 | 'email_taken': _('This email address is already in use') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | return value.lower() | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if (old_data.get('email') or '').lower() != value: | ||
Andrew Shadura
|
r5666 | user = User.get_by_email(value) | ||
Mads Kiilerich
|
r5306 | if user is not None: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('email_taken', state) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(email=msg) | ||||
) | ||||
return _validator | ||||
def ValidSystemEmail(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
Søren Løvborg
|
r5412 | 'non_existing_email': _('Email address "%(email)s" not found') | ||
Bradley M. Kuhn
|
r4187 | } | ||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | return value.lower() | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Andrew Shadura
|
r5666 | user = User.get_by_email(value) | ||
Bradley M. Kuhn
|
r4187 | if user is None: | ||
Thomas De Schampheleire
|
r6326 | msg = self.message('non_existing_email', state, email=value) | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(msg, value, state, | ||
error_dict=dict(email=msg) | ||||
) | ||||
return _validator | ||||
def LdapLibValidator(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
} | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | try: | ||
import ldap | ||||
ldap # pyflakes silence ! | ||||
except ImportError: | ||||
raise LdapImportError() | ||||
return _validator | ||||
def AttrLoginValidator(): | ||||
class _validator(formencode.validators.UnicodeString): | ||||
messages = { | ||||
'invalid_cn': | ||||
Mads Kiilerich
|
r5295 | _('The LDAP Login attribute of the CN must be specified - ' | ||
Bradley M. Kuhn
|
r4187 | 'this is the name of the attribute that is equivalent ' | ||
'to "username"') | ||||
} | ||||
messages['empty'] = messages['invalid_cn'] | ||||
return _validator | ||||
def ValidIp(): | ||||
class _validator(CIDR): | ||||
messages = dict( | ||||
Andrew Shadura
|
r5150 | badFormat=_('Please enter a valid IPv4 or IPv6 address'), | ||
Bradley M. Kuhn
|
r4187 | illegalBits=_('The network size (bits) must be within the range' | ||
' of 0-32 (not %(bits)r)') | ||||
) | ||||
def to_python(self, value, state): | ||||
v = super(_validator, self).to_python(value, state) | ||||
v = v.strip() | ||||
net = ipaddr.IPNetwork(address=v) | ||||
if isinstance(net, ipaddr.IPv4Network): | ||||
Lars Kruse
|
r6789 | # if IPv4 doesn't end with a mask, add /32 | ||
Bradley M. Kuhn
|
r4187 | if '/' not in value: | ||
v += '/32' | ||||
if isinstance(net, ipaddr.IPv6Network): | ||||
Lars Kruse
|
r6789 | # if IPv6 doesn't end with a mask, add /128 | ||
Bradley M. Kuhn
|
r4187 | if '/' not in value: | ||
v += '/128' | ||||
return v | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | try: | ||
addr = value.strip() | ||||
Lars Kruse
|
r6789 | # this raises an ValueError if address is not IPv4 or IPv6 | ||
Bradley M. Kuhn
|
r4187 | ipaddr.IPNetwork(address=addr) | ||
except ValueError: | ||||
raise formencode.Invalid(self.message('badFormat', state), | ||||
value, state) | ||||
return _validator | ||||
def FieldKey(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = dict( | ||||
badFormat=_('Key name can only consist of letters, ' | ||||
'underscore, dash or numbers') | ||||
) | ||||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if not re.match('[a-zA-Z0-9_-]+$', value): | ||
raise formencode.Invalid(self.message('badFormat', state), | ||||
value, state) | ||||
return _validator | ||||
def BasePath(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = dict( | ||||
badPath=_('Filename cannot be inside a directory') | ||||
) | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | return value | ||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | if value != os.path.basename(value): | ||
raise formencode.Invalid(self.message('badPath', state), | ||||
value, state) | ||||
return _validator | ||||
def ValidAuthPlugins(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = dict( | ||||
import_duplicate=_('Plugins %(loaded)s and %(next_to_load)s both export the same name') | ||||
) | ||||
Thomas De Schampheleire
|
r7651 | def _convert_to_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | # filter empty values | ||
Mads Kiilerich
|
r7893 | return [s for s in value if s not in [None, '']] | ||
Bradley M. Kuhn
|
r4187 | |||
Thomas De Schampheleire
|
r7651 | def _validate_python(self, value, state): | ||
Bradley M. Kuhn
|
r4187 | from kallithea.lib import auth_modules | ||
module_list = value | ||||
unique_names = {} | ||||
try: | ||||
for module in module_list: | ||||
plugin = auth_modules.loadplugin(module) | ||||
plugin_name = plugin.name | ||||
if plugin_name in unique_names: | ||||
Thomas De Schampheleire
|
r6326 | msg = self.message('import_duplicate', state, | ||
Bradley M. Kuhn
|
r4187 | loaded=unique_names[plugin_name], | ||
next_to_load=plugin_name) | ||||
raise formencode.Invalid(msg, value, state) | ||||
unique_names[plugin_name] = plugin | ||||
Mads Kiilerich
|
r5374 | except (ImportError, AttributeError, TypeError) as e: | ||
Bradley M. Kuhn
|
r4187 | raise formencode.Invalid(str(e), value, state) | ||
return _validator | ||||