validators.py
924 lines
| 33.6 KiB
| text/x-python
|
PythonLexer
Bradley M. Kuhn
|
r4116 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
r2466 | """ | |||
Set of generic validators | ||||
""" | ||||
Bradley M. Kuhn
|
r4116 | |||
r2466 | import os | |||
import re | ||||
import formencode | ||||
import logging | ||||
r2759 | from collections import defaultdict | |||
r2466 | from pylons.i18n.translation import _ | |||
from webhelpers.pylonslib.secure_form import authentication_token | ||||
from formencode.validators import ( | ||||
r2711 | UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, | |||
r3840 | NotEmpty, IPAddress, CIDR, String, FancyValidator | |||
r2466 | ) | |||
r2759 | from rhodecode.lib.compat import OrderedSet | |||
r3212 | from rhodecode.lib import ipaddr | |||
r2466 | from rhodecode.lib.utils import repo_name_slug | |||
Bradley M. Kuhn
|
r4116 | from rhodecode.lib.utils2 import safe_int, str2bool, aslist | ||
Mads Kiilerich
|
r3417 | from rhodecode.model.db import RepoGroup, Repository, UserGroup, User,\ | ||
r2719 | ChangesetStatus | |||
r2466 | from rhodecode.lib.exceptions import LdapImportError | |||
from rhodecode.config.routing import ADMIN_PREFIX | ||||
Bradley M. Kuhn
|
r4116 | from rhodecode.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny | ||
r2719 | ||||
r2466 | # silence warnings and pylint | |||
r2719 | UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \ | |||
r3840 | NotEmpty, IPAddress, CIDR, String, FancyValidator | |||
r2466 | ||||
log = logging.getLogger(__name__) | ||||
Bradley M. Kuhn
|
r4116 | class _Missing(object): | ||
pass | ||||
r2719 | ||||
Bradley M. Kuhn
|
r4116 | Missing = _Missing() | ||
r2719 | ||||
r2466 | class StateObj(object): | |||
""" | ||||
this is needed to translate the messages using _() in validators | ||||
""" | ||||
_ = staticmethod(_) | ||||
def M(self, key, state=None, **kwargs): | ||||
""" | ||||
returns string from self.message based on given key, | ||||
passed kw params are used to substitute %(named)s params inside | ||||
translated strings | ||||
:param msg: | ||||
:param state: | ||||
""" | ||||
if state is None: | ||||
state = StateObj() | ||||
else: | ||||
state._ = staticmethod(_) | ||||
#inject validator into state object | ||||
return self.message(key, state, **kwargs) | ||||
Bradley M. Kuhn
|
r4116 | def UniqueList(): | ||
class _UniqueList(formencode.FancyValidator): | ||||
""" | ||||
Unique List ! | ||||
""" | ||||
messages = dict( | ||||
empty=_('Value cannot be an empty list'), | ||||
missing_value=_('Value cannot be an empty list'), | ||||
) | ||||
def _to_python(self, value, state): | ||||
def make_unique(value): | ||||
seen = [] | ||||
return [c for c in value if not (c in seen or seen.append(c))] | ||||
if isinstance(value, list): | ||||
return make_unique(value) | ||||
elif isinstance(value, set): | ||||
return make_unique(list(value)) | ||||
elif isinstance(value, tuple): | ||||
return make_unique(list(value)) | ||||
elif value is None: | ||||
return [] | ||||
else: | ||||
return [value] | ||||
def empty_value(self, value): | ||||
return [] | ||||
return _UniqueList | ||||
def UniqueListFromString(): | ||||
class _UniqueListFromString(UniqueList()): | ||||
def _to_python(self, value, state): | ||||
if isinstance(value, basestring): | ||||
value = aslist(value, ',') | ||||
return super(_UniqueListFromString, self)._to_python(value, state) | ||||
return _UniqueListFromString | ||||
r2466 | def ValidUsername(edit=False, old_data={}): | |||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'username_exists': _(u'Username "%(username)s" already exists'), | ||||
'system_invalid_username': | ||||
_(u'Username "%(username)s" is forbidden'), | ||||
'invalid_username': | ||||
_(u'Username may only contain alphanumeric characters ' | ||||
r3925 | 'underscores, periods or dashes and must begin with ' | |||
'alphanumeric character or underscore') | ||||
r2466 | } | |||
def validate_python(self, value, state): | ||||
if value in ['default', 'new_user']: | ||||
msg = M(self, 'system_invalid_username', state, username=value) | ||||
raise formencode.Invalid(msg, value, state) | ||||
#check if user is unique | ||||
old_un = None | ||||
if edit: | ||||
old_un = User.get(old_data.get('user_id')).username | ||||
if old_un != value or not edit: | ||||
if User.get_by_username(value, case_insensitive=True): | ||||
msg = M(self, 'username_exists', state, username=value) | ||||
raise formencode.Invalid(msg, value, state) | ||||
r3914 | if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None: | |||
r2466 | msg = M(self, 'invalid_username', state) | |||
raise formencode.Invalid(msg, value, state) | ||||
return _validator | ||||
Bradley M. Kuhn
|
r4116 | def ValidRegex(msg=None): | ||
class _validator(formencode.validators.Regex): | ||||
messages = dict(invalid=msg or _('The input is not valid')) | ||||
return _validator | ||||
r2466 | def ValidRepoUser(): | |||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_username': _(u'Username %(username)s is not valid') | ||||
} | ||||
def validate_python(self, value, state): | ||||
try: | ||||
User.query().filter(User.active == True)\ | ||||
.filter(User.username == value).one() | ||||
except Exception: | ||||
msg = M(self, 'invalid_username', state, username=value) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(username=msg) | ||||
) | ||||
return _validator | ||||
Mads Kiilerich
|
r3417 | def ValidUserGroup(edit=False, old_data={}): | ||
r2466 | class _validator(formencode.validators.FancyValidator): | |||
messages = { | ||||
r3415 | 'invalid_group': _(u'Invalid user group name'), | |||
Mads Kiilerich
|
r3417 | 'group_exist': _(u'User group "%(usergroup)s" already exists'), | ||
'invalid_usergroup_name': | ||||
_(u'user group name may only contain alphanumeric ' | ||||
r2466 | 'characters underscores, periods or dashes and must begin ' | |||
'with alphanumeric character') | ||||
} | ||||
def validate_python(self, value, state): | ||||
if value in ['default']: | ||||
msg = M(self, 'invalid_group', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(users_group_name=msg) | ||||
) | ||||
#check if group is unique | ||||
old_ugname = None | ||||
if edit: | ||||
old_id = old_data.get('users_group_id') | ||||
Mads Kiilerich
|
r3417 | old_ugname = UserGroup.get(old_id).users_group_name | ||
r2466 | ||||
if old_ugname != value or not edit: | ||||
Mads Kiilerich
|
r3417 | is_existing_group = UserGroup.get_by_group_name(value, | ||
r2466 | case_insensitive=True) | |||
if is_existing_group: | ||||
Mads Kiilerich
|
r3417 | msg = M(self, 'group_exist', state, usergroup=value) | ||
r2466 | raise formencode.Invalid(msg, value, state, | |||
error_dict=dict(users_group_name=msg) | ||||
) | ||||
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None: | ||||
Mads Kiilerich
|
r3417 | msg = M(self, 'invalid_usergroup_name', state) | ||
r2466 | raise formencode.Invalid(msg, value, state, | |||
error_dict=dict(users_group_name=msg) | ||||
) | ||||
return _validator | ||||
Bradley M. Kuhn
|
r4116 | def ValidRepoGroup(edit=False, old_data={}): | ||
r2466 | class _validator(formencode.validators.FancyValidator): | |||
messages = { | ||||
'group_parent_id': _(u'Cannot assign this group as parent'), | ||||
'group_exists': _(u'Group "%(group_name)s" already exists'), | ||||
'repo_exists': | ||||
_(u'Repository with name "%(group_name)s" already exists') | ||||
} | ||||
def validate_python(self, value, state): | ||||
# TODO WRITE VALIDATIONS | ||||
group_name = value.get('group_name') | ||||
group_parent_id = value.get('group_parent_id') | ||||
# slugify repo group just in case :) | ||||
slug = repo_name_slug(group_name) | ||||
# check for parent of self | ||||
parent_of_self = lambda: ( | ||||
old_data['group_id'] == int(group_parent_id) | ||||
if group_parent_id else False | ||||
) | ||||
if edit and parent_of_self(): | ||||
msg = M(self, 'group_parent_id', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(group_parent_id=msg) | ||||
) | ||||
old_gname = None | ||||
if edit: | ||||
old_gname = RepoGroup.get(old_data.get('group_id')).group_name | ||||
if old_gname != group_name or not edit: | ||||
# check group | ||||
gr = RepoGroup.query()\ | ||||
.filter(RepoGroup.group_name == slug)\ | ||||
.filter(RepoGroup.group_parent_id == group_parent_id)\ | ||||
.scalar() | ||||
if gr: | ||||
msg = M(self, 'group_exists', state, group_name=slug) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(group_name=msg) | ||||
) | ||||
# check for same repo | ||||
repo = Repository.query()\ | ||||
.filter(Repository.repo_name == slug)\ | ||||
.scalar() | ||||
if repo: | ||||
msg = M(self, 'repo_exists', state, group_name=slug) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(group_name=msg) | ||||
) | ||||
return _validator | ||||
def ValidPassword(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_password': | ||||
_(u'Invalid characters (non-ascii) in password') | ||||
} | ||||
def validate_python(self, value, state): | ||||
try: | ||||
(value or '').decode('ascii') | ||||
except UnicodeError: | ||||
msg = M(self, 'invalid_password', state) | ||||
raise formencode.Invalid(msg, value, state,) | ||||
return _validator | ||||
Bradley M. Kuhn
|
r4116 | def ValidOldPassword(username): | ||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_password': _(u'Invalid old password') | ||||
} | ||||
def validate_python(self, value, state): | ||||
from rhodecode.lib import auth_modules | ||||
if not auth_modules.authenticate(username, value, ''): | ||||
msg = M(self, 'invalid_password', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(current_password=msg) | ||||
) | ||||
return _validator | ||||
def ValidPasswordsMatch(passwd='new_password', passwd_confirmation='password_confirmation'): | ||||
r2466 | class _validator(formencode.validators.FancyValidator): | |||
messages = { | ||||
'password_mismatch': _(u'Passwords do not match'), | ||||
} | ||||
def validate_python(self, value, state): | ||||
Bradley M. Kuhn
|
r4116 | pass_val = value.get('password') or value.get(passwd) | ||
if pass_val != value[passwd_confirmation]: | ||||
r2466 | msg = M(self, 'password_mismatch', state) | |||
raise formencode.Invalid(msg, value, state, | ||||
Bradley M. Kuhn
|
r4116 | error_dict={passwd:msg, passwd_confirmation: msg} | ||
r2466 | ) | |||
return _validator | ||||
def ValidAuth(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_password': _(u'invalid password'), | ||||
'invalid_username': _(u'invalid user name'), | ||||
'disabled_account': _(u'Your account is disabled') | ||||
} | ||||
def validate_python(self, value, state): | ||||
Bradley M. Kuhn
|
r4116 | from rhodecode.lib import auth_modules | ||
r2479 | ||||
r2466 | password = value['password'] | |||
username = value['username'] | ||||
Bradley M. Kuhn
|
r4116 | if not auth_modules.authenticate(username, password): | ||
r2466 | user = User.get_by_username(username) | |||
Mads Kiilerich
|
r3625 | if user and not user.active: | ||
r2466 | log.warning('user %s is disabled' % username) | |||
msg = M(self, 'disabled_account', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(username=msg) | ||||
) | ||||
else: | ||||
log.warning('user %s failed to authenticate' % username) | ||||
msg = M(self, 'invalid_username', state) | ||||
msg2 = M(self, 'invalid_password', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(username=msg, password=msg2) | ||||
) | ||||
return _validator | ||||
def ValidAuthToken(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_token': _(u'Token mismatch') | ||||
} | ||||
def validate_python(self, value, state): | ||||
if value != authentication_token(): | ||||
msg = M(self, 'invalid_token', state) | ||||
raise formencode.Invalid(msg, value, state) | ||||
return _validator | ||||
def ValidRepoName(edit=False, old_data={}): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_repo_name': | ||||
_(u'Repository name %(repo)s is disallowed'), | ||||
'repository_exists': | ||||
_(u'Repository named %(repo)s already exists'), | ||||
'repository_in_group_exists': _(u'Repository "%(repo)s" already ' | ||||
'exists in group "%(group)s"'), | ||||
Mads Kiilerich
|
r3416 | 'same_group_exists': _(u'Repository group with name "%(repo)s" ' | ||
r2466 | 'already exists') | |||
} | ||||
def _to_python(self, value, state): | ||||
repo_name = repo_name_slug(value.get('repo_name', '')) | ||||
repo_group = value.get('repo_group') | ||||
if repo_group: | ||||
gr = RepoGroup.get(repo_group) | ||||
group_path = gr.full_path | ||||
group_name = gr.group_name | ||||
# value needs to be aware of group name in order to check | ||||
# db key This is an actual just the name to store in the | ||||
# database | ||||
repo_name_full = group_path + RepoGroup.url_sep() + repo_name | ||||
else: | ||||
group_name = group_path = '' | ||||
repo_name_full = repo_name | ||||
value['repo_name'] = repo_name | ||||
value['repo_name_full'] = repo_name_full | ||||
value['group_path'] = group_path | ||||
value['group_name'] = group_name | ||||
return value | ||||
def validate_python(self, value, state): | ||||
repo_name = value.get('repo_name') | ||||
repo_name_full = value.get('repo_name_full') | ||||
group_path = value.get('group_path') | ||||
group_name = value.get('group_name') | ||||
if repo_name in [ADMIN_PREFIX, '']: | ||||
msg = M(self, 'invalid_repo_name', state, repo=repo_name) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_name=msg) | ||||
) | ||||
rename = old_data.get('repo_name') != repo_name_full | ||||
create = not edit | ||||
if rename or create: | ||||
if group_path != '': | ||||
if Repository.get_by_repo_name(repo_name_full): | ||||
msg = M(self, 'repository_in_group_exists', state, | ||||
repo=repo_name, group=group_name) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_name=msg) | ||||
) | ||||
elif RepoGroup.get_by_group_name(repo_name_full): | ||||
msg = M(self, 'same_group_exists', state, | ||||
repo=repo_name) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_name=msg) | ||||
) | ||||
elif Repository.get_by_repo_name(repo_name_full): | ||||
msg = M(self, 'repository_exists', state, | ||||
repo=repo_name) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_name=msg) | ||||
) | ||||
return value | ||||
return _validator | ||||
def ValidForkName(*args, **kwargs): | ||||
return ValidRepoName(*args, **kwargs) | ||||
def SlugifyName(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
def _to_python(self, value, state): | ||||
return repo_name_slug(value) | ||||
def validate_python(self, value, state): | ||||
pass | ||||
return _validator | ||||
def ValidCloneUri(): | ||||
from rhodecode.lib.utils import make_ui | ||||
Bradley M. Kuhn
|
r4116 | def url_handler(repo_type, url, ui): | ||
r2466 | if repo_type == 'hg': | |||
r2706 | from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository | |||
if url.startswith('http'): | ||||
Bradley M. Kuhn
|
r4116 | # initially check if it's at least the proper URL | ||
# or does it pass basic auth | ||||
MercurialRepository._check_url(url, ui) | ||||
domruf
|
r2701 | elif url.startswith('svn+http'): | ||
from hgsubversion.svnrepo import svnremoterepo | ||||
Bradley M. Kuhn
|
r4116 | svnremoterepo(ui, url).svn.uuid | ||
r2706 | elif url.startswith('git+http'): | |||
raise NotImplementedError() | ||||
r3482 | else: | |||
r3941 | raise Exception('clone from URI %s not allowed' % (url,)) | |||
r2706 | ||||
r2466 | elif repo_type == 'git': | |||
r2706 | from rhodecode.lib.vcs.backends.git.repository import GitRepository | |||
if url.startswith('http'): | ||||
Bradley M. Kuhn
|
r4116 | # initially check if it's at least the proper URL | ||
# or does it pass basic auth | ||||
r2706 | GitRepository._check_url(url) | |||
elif url.startswith('svn+http'): | ||||
raise NotImplementedError() | ||||
elif url.startswith('hg+http'): | ||||
raise NotImplementedError() | ||||
r3482 | else: | |||
raise Exception('clone from URI %s not allowed' % (url)) | ||||
r2466 | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'clone_uri': _(u'invalid clone url'), | ||||
'invalid_clone_uri': _(u'Invalid clone url, provide a ' | ||||
domruf
|
r2700 | 'valid clone http(s)/svn+http(s) url') | ||
r2466 | } | |||
def validate_python(self, value, state): | ||||
repo_type = value.get('repo_type') | ||||
url = value.get('clone_uri') | ||||
if not url: | ||||
pass | ||||
domruf
|
r2701 | else: | ||
r2466 | try: | |||
r2717 | url_handler(repo_type, url, make_ui('db', clear_session=False)) | |||
r2466 | except Exception: | |||
log.exception('Url validation failed') | ||||
msg = M(self, 'clone_uri') | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(clone_uri=msg) | ||||
) | ||||
return _validator | ||||
def ValidForkType(old_data={}): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_fork_type': _(u'Fork have to be the same type as parent') | ||||
} | ||||
def validate_python(self, value, state): | ||||
if old_data['repo_type'] != value: | ||||
msg = M(self, 'invalid_fork_type', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_type=msg) | ||||
) | ||||
return _validator | ||||
r3524 | def CanWriteGroup(old_data=None): | |||
r2835 | class _validator(formencode.validators.FancyValidator): | |||
messages = { | ||||
'permission_denied': _(u"You don't have permissions " | ||||
r3372 | "to create repository in this group"), | |||
'permission_denied_root': _(u"no permission to create repository " | ||||
"in root location") | ||||
r2835 | } | |||
r3372 | def _to_python(self, value, state): | |||
r3222 | #root location | |||
if value in [-1, "-1"]: | ||||
return None | ||||
return value | ||||
r2835 | def validate_python(self, value, state): | |||
gr = RepoGroup.get(value) | ||||
r3222 | gr_name = gr.group_name if gr else None # None means ROOT location | |||
Bradley M. Kuhn
|
r4116 | # create repositories with write permission on group is set to true | ||
create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')() | ||||
group_admin = HasRepoGroupPermissionAny('group.admin')(gr_name, | ||||
'can write into group validator') | ||||
group_write = HasRepoGroupPermissionAny('group.write')(gr_name, | ||||
'can write into group validator') | ||||
forbidden = not (group_admin or (group_write and create_on_write)) | ||||
r3372 | can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository') | |||
Bradley M. Kuhn
|
r4116 | gid = (old_data['repo_group'].get('group_id') | ||
if (old_data and 'repo_group' in old_data) else None) | ||||
value_changed = gid != safe_int(value) | ||||
new = not old_data | ||||
# do check if we changed the value, there's a case that someone got | ||||
# revoked write permissions to a repository, he still created, we | ||||
# don't need to check permission if he didn't change the value of | ||||
# groups in form box | ||||
if value_changed or new: | ||||
r3524 | #parent group need to be existing | |||
if gr and forbidden: | ||||
msg = M(self, 'permission_denied', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_type=msg) | ||||
) | ||||
## check if we can write to root location ! | ||||
Mads Kiilerich
|
r3625 | elif gr is None and not can_create_repos(): | ||
r3524 | msg = M(self, 'permission_denied_root', state) | |||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(repo_type=msg) | ||||
) | ||||
r3372 | ||||
r2835 | return _validator | |||
r3222 | def CanCreateGroup(can_create_in_root=False): | |||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'permission_denied': _(u"You don't have permissions " | ||||
"to create a group in this location") | ||||
} | ||||
def to_python(self, value, state): | ||||
#root location | ||||
if value in [-1, "-1"]: | ||||
return None | ||||
return value | ||||
def validate_python(self, value, state): | ||||
gr = RepoGroup.get(value) | ||||
gr_name = gr.group_name if gr else None # None means ROOT location | ||||
if can_create_in_root and gr is None: | ||||
#we can create in root, we're fine no validations required | ||||
return | ||||
Mads Kiilerich
|
r3625 | forbidden_in_root = gr is None and not can_create_in_root | ||
Bradley M. Kuhn
|
r4116 | val = HasRepoGroupPermissionAny('group.admin') | ||
r3222 | forbidden = not val(gr_name, 'can create group validator') | |||
if forbidden_in_root or forbidden: | ||||
msg = M(self, 'permission_denied', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(group_parent_id=msg) | ||||
) | ||||
return _validator | ||||
r2466 | def ValidPerms(type_='repo'): | |||
r3714 | if type_ == 'repo_group': | |||
r2466 | EMPTY_PERM = 'group.none' | |||
elif type_ == 'repo': | ||||
EMPTY_PERM = 'repository.none' | ||||
r3714 | elif type_ == 'user_group': | |||
EMPTY_PERM = 'usergroup.none' | ||||
r2466 | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'perm_new_member_name': | ||||
r3415 | _(u'This username or user group name is not valid') | |||
r2466 | } | |||
def to_python(self, value, state): | ||||
r2759 | perms_update = OrderedSet() | |||
perms_new = OrderedSet() | ||||
r2466 | # build a list of permission to update and new permission to create | |||
r2759 | ||||
#CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using | ||||
new_perms_group = defaultdict(dict) | ||||
for k, v in value.copy().iteritems(): | ||||
r2466 | if k.startswith('perm_new_member'): | |||
r2759 | del value[k] | |||
_type, part = k.split('perm_new_member_') | ||||
args = part.split('_') | ||||
if len(args) == 1: | ||||
new_perms_group[args[0]]['perm'] = v | ||||
elif len(args) == 2: | ||||
_key, pos = args | ||||
new_perms_group[pos][_key] = v | ||||
r2466 | ||||
r2759 | # fill new permissions in order of how they were added | |||
for k in sorted(map(int, new_perms_group.keys())): | ||||
perm_dict = new_perms_group[str(k)] | ||||
r2820 | new_member = perm_dict.get('name') | |||
new_perm = perm_dict.get('perm') | ||||
new_type = perm_dict.get('type') | ||||
r2759 | if new_member and new_perm and new_type: | |||
perms_new.add((new_member, new_perm, new_type)) | ||||
for k, v in value.iteritems(): | ||||
if k.startswith('u_perm_') or k.startswith('g_perm_'): | ||||
r2466 | member = k[7:] | |||
t = {'u': 'user', | ||||
'g': 'users_group' | ||||
}[k[0]] | ||||
Bradley M. Kuhn
|
r4116 | if member == User.DEFAULT_USER: | ||
r3629 | if str2bool(value.get('repo_private')): | |||
r2466 | # set none for default when updating to | |||
r3629 | # private repo protects agains form manipulation | |||
r2466 | v = EMPTY_PERM | |||
r2759 | perms_update.add((member, v, t)) | |||
r2466 | ||||
r2759 | value['perms_updates'] = list(perms_update) | |||
value['perms_new'] = list(perms_new) | ||||
r2466 | ||||
# update permissions | ||||
for k, v, t in perms_new: | ||||
try: | ||||
if t is 'user': | ||||
self.user_db = User.query()\ | ||||
.filter(User.active == True)\ | ||||
.filter(User.username == k).one() | ||||
if t is 'users_group': | ||||
Mads Kiilerich
|
r3417 | self.user_db = UserGroup.query()\ | ||
.filter(UserGroup.users_group_active == True)\ | ||||
.filter(UserGroup.users_group_name == k).one() | ||||
r2466 | ||||
except Exception: | ||||
log.exception('Updated permission failed') | ||||
msg = M(self, 'perm_new_member_type', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(perm_new_member_name=msg) | ||||
) | ||||
return value | ||||
return _validator | ||||
def ValidSettings(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
def _to_python(self, value, state): | ||||
r3149 | # settings form for users that are not admin | |||
r3089 | # can't edit certain parameters, it's extra backup if they mangle | |||
# with forms | ||||
forbidden_params = [ | ||||
'user', 'repo_type', 'repo_enable_locking', | ||||
'repo_enable_downloads', 'repo_enable_statistics' | ||||
] | ||||
for param in forbidden_params: | ||||
if param in value: | ||||
del value[param] | ||||
r2466 | return value | |||
def validate_python(self, value, state): | ||||
pass | ||||
return _validator | ||||
def ValidPath(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'invalid_path': _(u'This is not a valid path') | ||||
} | ||||
def validate_python(self, value, state): | ||||
if not os.path.isdir(value): | ||||
msg = M(self, 'invalid_path', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(paths_root_path=msg) | ||||
) | ||||
return _validator | ||||
def UniqSystemEmail(old_data={}): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'email_taken': _(u'This e-mail address is already taken') | ||||
} | ||||
def _to_python(self, value, state): | ||||
return value.lower() | ||||
def validate_python(self, value, state): | ||||
if (old_data.get('email') or '').lower() != value: | ||||
user = User.get_by_email(value, case_insensitive=True) | ||||
if user: | ||||
msg = M(self, 'email_taken', state) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(email=msg) | ||||
) | ||||
return _validator | ||||
def ValidSystemEmail(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
'non_existing_email': _(u'e-mail "%(email)s" does not exist.') | ||||
} | ||||
def _to_python(self, value, state): | ||||
return value.lower() | ||||
def validate_python(self, value, state): | ||||
user = User.get_by_email(value, case_insensitive=True) | ||||
if user is None: | ||||
msg = M(self, 'non_existing_email', state, email=value) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(email=msg) | ||||
) | ||||
return _validator | ||||
def LdapLibValidator(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = { | ||||
} | ||||
def validate_python(self, value, state): | ||||
try: | ||||
import ldap | ||||
ldap # pyflakes silence ! | ||||
except ImportError: | ||||
raise LdapImportError() | ||||
return _validator | ||||
def AttrLoginValidator(): | ||||
Bradley M. Kuhn
|
r4116 | class _validator(formencode.validators.UnicodeString): | ||
r2466 | messages = { | |||
'invalid_cn': | ||||
_(u'The LDAP Login attribute of the CN must be specified - ' | ||||
'this is the name of the attribute that is equivalent ' | ||||
'to "username"') | ||||
} | ||||
r3674 | messages['empty'] = messages['invalid_cn'] | |||
r2466 | ||||
return _validator | ||||
r2719 | ||||
r2893 | def NotReviewedRevisions(repo_id): | |||
r2719 | class _validator(formencode.validators.FancyValidator): | |||
messages = { | ||||
'rev_already_reviewed': | ||||
_(u'Revisions %(revs)s are already part of pull request ' | ||||
'or have set status') | ||||
} | ||||
def validate_python(self, value, state): | ||||
# check revisions if they are not reviewed, or a part of another | ||||
# pull request | ||||
statuses = ChangesetStatus.query()\ | ||||
r2893 | .filter(ChangesetStatus.revision.in_(value))\ | |||
.filter(ChangesetStatus.repo_id == repo_id)\ | ||||
.all() | ||||
r2719 | errors = [] | |||
for cs in statuses: | ||||
if cs.pull_request_id: | ||||
errors.append(['pull_req', cs.revision[:12]]) | ||||
elif cs.status: | ||||
errors.append(['status', cs.revision[:12]]) | ||||
if errors: | ||||
revs = ','.join([x[1] for x in errors]) | ||||
msg = M(self, 'rev_already_reviewed', state, revs=revs) | ||||
raise formencode.Invalid(msg, value, state, | ||||
error_dict=dict(revisions=revs) | ||||
) | ||||
return _validator | ||||
r3125 | ||||
def ValidIp(): | ||||
class _validator(CIDR): | ||||
messages = dict( | ||||
r3212 | badFormat=_('Please enter a valid IPv4 or IpV6 address'), | |||
r3125 | illegalBits=_('The network size (bits) must be within the range' | |||
r3846 | ' of 0-32 (not %(bits)r)') | |||
) | ||||
r3125 | ||||
r3212 | def to_python(self, value, state): | |||
v = super(_validator, self).to_python(value, state) | ||||
v = v.strip() | ||||
net = ipaddr.IPNetwork(address=v) | ||||
if isinstance(net, ipaddr.IPv4Network): | ||||
#if IPv4 doesn't end with a mask, add /32 | ||||
if '/' not in value: | ||||
v += '/32' | ||||
if isinstance(net, ipaddr.IPv6Network): | ||||
#if IPv6 doesn't end with a mask, add /128 | ||||
if '/' not in value: | ||||
v += '/128' | ||||
return v | ||||
r3125 | def validate_python(self, value, state): | |||
try: | ||||
r3212 | addr = value.strip() | |||
#this raises an ValueError if address is not IpV4 or IpV6 | ||||
ipaddr.IPNetwork(address=addr) | ||||
r3125 | except ValueError: | |||
raise formencode.Invalid(self.message('badFormat', state), | ||||
value, state) | ||||
return _validator | ||||
r3308 | ||||
def FieldKey(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = dict( | ||||
badFormat=_('Key name can only consist of letters, ' | ||||
r3846 | 'underscore, dash or numbers') | |||
) | ||||
r3308 | ||||
def validate_python(self, value, state): | ||||
if not re.match('[a-zA-Z0-9_-]+$', value): | ||||
raise formencode.Invalid(self.message('badFormat', state), | ||||
value, state) | ||||
return _validator | ||||
r3846 | ||||
def BasePath(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = dict( | ||||
badPath=_('Filename cannot be inside a directory') | ||||
) | ||||
def _to_python(self, value, state): | ||||
return value | ||||
def validate_python(self, value, state): | ||||
if value != os.path.basename(value): | ||||
raise formencode.Invalid(self.message('badPath', state), | ||||
value, state) | ||||
return _validator | ||||
Bradley M. Kuhn
|
r4116 | |||
def ValidAuthPlugins(): | ||||
class _validator(formencode.validators.FancyValidator): | ||||
messages = dict( | ||||
import_duplicate=_('Plugins %(loaded)s and %(next_to_load)s both export the same name') | ||||
) | ||||
def _to_python(self, value, state): | ||||
# filter empty values | ||||
return filter(lambda s: s not in [None, ''], value) | ||||
def validate_python(self, value, state): | ||||
from rhodecode.lib import auth_modules | ||||
module_list = value | ||||
unique_names = {} | ||||
try: | ||||
for module in module_list: | ||||
plugin = auth_modules.loadplugin(module) | ||||
plugin_name = plugin.name | ||||
if plugin_name in unique_names: | ||||
msg = M(self, 'import_duplicate', state, | ||||
loaded=unique_names[plugin_name], | ||||
next_to_load=plugin_name) | ||||
raise formencode.Invalid(msg, value, state) | ||||
unique_names[plugin_name] = plugin | ||||
except (ImportError, AttributeError, TypeError), e: | ||||
raise formencode.Invalid(str(e), value, state) | ||||
return _validator | ||||