|
|
""" this is forms validation classes
|
|
|
http://formencode.org/module-formencode.validators.html
|
|
|
for list off all availible validators
|
|
|
|
|
|
we can create our own validators
|
|
|
|
|
|
The table below outlines the options which can be used in a schema in addition to the validators themselves
|
|
|
pre_validators [] These validators will be applied before the schema
|
|
|
chained_validators [] These validators will be applied after the schema
|
|
|
allow_extra_fields False If True, then it is not an error when keys that aren't associated with a validator are present
|
|
|
filter_extra_fields False If True, then keys that aren't associated with a validator are removed
|
|
|
if_key_missing NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value.
|
|
|
ignore_key_missing False If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already
|
|
|
|
|
|
|
|
|
<name> = formencode.validators.<name of validator>
|
|
|
<name> must equal form name
|
|
|
list=[1,2,3,4,5]
|
|
|
for SELECT use formencode.All(OneOf(list), Int())
|
|
|
|
|
|
"""
|
|
|
import os
|
|
|
import re
|
|
|
import logging
|
|
|
import traceback
|
|
|
|
|
|
import formencode
|
|
|
from formencode import All
|
|
|
from formencode.validators import UnicodeString, OneOf, Int, Number, Regex, \
|
|
|
Email, Bool, StringBoolean, Set
|
|
|
|
|
|
from pylons.i18n.translation import _
|
|
|
from webhelpers.pylonslib.secure_form import authentication_token
|
|
|
|
|
|
from rhodecode.config.routing import ADMIN_PREFIX
|
|
|
from rhodecode.lib.utils import repo_name_slug
|
|
|
from rhodecode.lib.auth import authenticate, get_crypt_password
|
|
|
from rhodecode.lib.exceptions import LdapImportError
|
|
|
from rhodecode.model.user import UserModel
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
from rhodecode.model.db import User, UsersGroup, Group, Repository
|
|
|
from rhodecode import BACKENDS
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
#this is needed to translate the messages using _() in validators
|
|
|
class State_obj(object):
|
|
|
_ = staticmethod(_)
|
|
|
|
|
|
#==============================================================================
|
|
|
# VALIDATORS
|
|
|
#==============================================================================
|
|
|
class ValidAuthToken(formencode.validators.FancyValidator):
|
|
|
messages = {'invalid_token':_('Token mismatch')}
|
|
|
|
|
|
def validate_python(self, value, state):
|
|
|
|
|
|
if value != authentication_token():
|
|
|
raise formencode.Invalid(self.message('invalid_token', state,
|
|
|
search_number=value), value, state)
|
|
|
|
|
|
def ValidUsername(edit, old_data):
|
|
|
class _ValidUsername(formencode.validators.FancyValidator):
|
|
|
|
|
|
def validate_python(self, value, state):
|
|
|
if value in ['default', 'new_user']:
|
|
|
raise formencode.Invalid(_('Invalid username'), value, state)
|
|
|
#check if user is unique
|
|
|
old_un = None
|
|
|
if edit:
|
|
|
old_un = UserModel().get(old_data.get('user_id')).username
|
|
|
|
|
|
if old_un != value or not edit:
|
|
|
if User.get_by_username(value, case_insensitive=True):
|
|
|
raise formencode.Invalid(_('This username already '
|
|
|
'exists') , value, state)
|
|
|
|
|
|
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
|
|
|
raise formencode.Invalid(_('Username may only contain '
|
|
|
'alphanumeric characters '
|
|
|
'underscores, periods or dashes '
|
|
|
'and must begin with alphanumeric '
|
|
|
'character'), value, state)
|
|
|
|
|
|
return _ValidUsername
|
|
|
|
|
|
|
|
|
def ValidUsersGroup(edit, old_data):
|
|
|
|
|
|
class _ValidUsersGroup(formencode.validators.FancyValidator):
|
|
|
|
|
|
def validate_python(self, value, state):
|
|
|
if value in ['default']:
|
|
|
raise formencode.Invalid(_('Invalid group name'), value, state)
|
|
|
#check if group is unique
|
|
|
old_ugname = None
|
|
|
if edit:
|
|
|
old_ugname = UsersGroup.get(
|
|
|
old_data.get('users_group_id')).users_group_name
|
|
|
|
|
|
if old_ugname != value or not edit:
|
|
|
if UsersGroup.get_by_group_name(value, cache=False,
|
|
|
case_insensitive=True):
|
|
|
raise formencode.Invalid(_('This users group '
|
|
|
'already exists') , value,
|
|
|
state)
|
|
|
|
|
|
|
|
|
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
|
|
|
raise formencode.Invalid(_('Group name may only contain '
|
|
|
'alphanumeric characters '
|
|
|
'underscores, periods or dashes '
|
|
|
'and must begin with alphanumeric '
|
|
|
'character'), value, state)
|
|
|
|
|
|
return _ValidUsersGroup
|
|
|
|
|
|
|
|
|
def ValidReposGroup(edit, old_data):
|
|
|
class _ValidReposGroup(formencode.validators.FancyValidator):
|
|
|
|
|
|
def validate_python(self, value, state):
|
|
|
# TODO WRITE VALIDATIONS
|
|
|
group_name = value.get('group_name')
|
|
|
group_parent_id = value.get('group_parent_id')
|
|
|
|
|
|
# slugify repo group just in case :)
|
|
|
slug = repo_name_slug(group_name)
|
|
|
|
|
|
# check for parent of self
|
|
|
parent_of_self = lambda: (
|
|
|
old_data['group_id'] == int(group_parent_id)
|
|
|
if group_parent_id else False
|
|
|
)
|
|
|
if edit and parent_of_self():
|
|
|
e_dict = {
|
|
|
'group_parent_id': _('Cannot assign this group as parent')
|
|
|
}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
|
|
|
old_gname = None
|
|
|
if edit:
|
|
|
old_gname = Group.get(
|
|
|
old_data.get('group_id')).group_name
|
|
|
|
|
|
if old_gname != group_name or not edit:
|
|
|
|
|
|
# check group
|
|
|
gr = Group.query()\
|
|
|
.filter(Group.group_name == slug)\
|
|
|
.filter(Group.group_parent_id == group_parent_id)\
|
|
|
.scalar()
|
|
|
|
|
|
if gr:
|
|
|
e_dict = {
|
|
|
'group_name': _('This group already exists')
|
|
|
}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
|
|
|
# check for same repo
|
|
|
repo = Repository.query()\
|
|
|
.filter(Repository.repo_name == slug)\
|
|
|
.scalar()
|
|
|
|
|
|
if repo:
|
|
|
e_dict = {
|
|
|
'group_name': _('Repository with this name already exists')
|
|
|
}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
|
|
|
return _ValidReposGroup
|
|
|
|
|
|
class ValidPassword(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
|
|
|
if value:
|
|
|
|
|
|
if value.get('password'):
|
|
|
try:
|
|
|
value['password'] = get_crypt_password(value['password'])
|
|
|
except UnicodeEncodeError:
|
|
|
e_dict = {'password':_('Invalid characters in password')}
|
|
|
raise formencode.Invalid('', value, state, error_dict=e_dict)
|
|
|
|
|
|
if value.get('password_confirmation'):
|
|
|
try:
|
|
|
value['password_confirmation'] = \
|
|
|
get_crypt_password(value['password_confirmation'])
|
|
|
except UnicodeEncodeError:
|
|
|
e_dict = {'password_confirmation':_('Invalid characters in password')}
|
|
|
raise formencode.Invalid('', value, state, error_dict=e_dict)
|
|
|
|
|
|
if value.get('new_password'):
|
|
|
try:
|
|
|
value['new_password'] = \
|
|
|
get_crypt_password(value['new_password'])
|
|
|
except UnicodeEncodeError:
|
|
|
e_dict = {'new_password':_('Invalid characters in password')}
|
|
|
raise formencode.Invalid('', value, state, error_dict=e_dict)
|
|
|
|
|
|
return value
|
|
|
|
|
|
class ValidPasswordsMatch(formencode.validators.FancyValidator):
|
|
|
|
|
|
def validate_python(self, value, state):
|
|
|
|
|
|
pass_val = value.get('password') or value.get('new_password')
|
|
|
if pass_val != value['password_confirmation']:
|
|
|
e_dict = {'password_confirmation':
|
|
|
_('Passwords do not match')}
|
|
|
raise formencode.Invalid('', value, state, error_dict=e_dict)
|
|
|
|
|
|
class ValidAuth(formencode.validators.FancyValidator):
|
|
|
messages = {
|
|
|
'invalid_password':_('invalid password'),
|
|
|
'invalid_login':_('invalid user name'),
|
|
|
'disabled_account':_('Your account is disabled')
|
|
|
}
|
|
|
|
|
|
# error mapping
|
|
|
e_dict = {'username':messages['invalid_login'],
|
|
|
'password':messages['invalid_password']}
|
|
|
e_dict_disable = {'username':messages['disabled_account']}
|
|
|
|
|
|
def validate_python(self, value, state):
|
|
|
password = value['password']
|
|
|
username = value['username']
|
|
|
user = User.get_by_username(username)
|
|
|
|
|
|
if authenticate(username, password):
|
|
|
return value
|
|
|
else:
|
|
|
if user and user.active is False:
|
|
|
log.warning('user %s is disabled', username)
|
|
|
raise formencode.Invalid(self.message('disabled_account',
|
|
|
state=State_obj),
|
|
|
value, state,
|
|
|
error_dict=self.e_dict_disable)
|
|
|
else:
|
|
|
log.warning('user %s not authenticated', username)
|
|
|
raise formencode.Invalid(self.message('invalid_password',
|
|
|
state=State_obj), value, state,
|
|
|
error_dict=self.e_dict)
|
|
|
|
|
|
class ValidRepoUser(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
try:
|
|
|
User.query().filter(User.active == True)\
|
|
|
.filter(User.username == value).one()
|
|
|
except Exception:
|
|
|
raise formencode.Invalid(_('This username is not valid'),
|
|
|
value, state)
|
|
|
return value
|
|
|
|
|
|
def ValidRepoName(edit, old_data):
|
|
|
class _ValidRepoName(formencode.validators.FancyValidator):
|
|
|
def to_python(self, value, state):
|
|
|
|
|
|
repo_name = value.get('repo_name')
|
|
|
|
|
|
slug = repo_name_slug(repo_name)
|
|
|
if slug in [ADMIN_PREFIX, '']:
|
|
|
e_dict = {'repo_name': _('This repository name is disallowed')}
|
|
|
raise formencode.Invalid('', value, state, error_dict=e_dict)
|
|
|
|
|
|
|
|
|
if value.get('repo_group'):
|
|
|
gr = Group.get(value.get('repo_group'))
|
|
|
group_path = gr.full_path
|
|
|
# value needs to be aware of group name in order to check
|
|
|
# db key This is an actual just the name to store in the
|
|
|
# database
|
|
|
repo_name_full = group_path + Group.url_sep() + repo_name
|
|
|
|
|
|
else:
|
|
|
group_path = ''
|
|
|
repo_name_full = repo_name
|
|
|
|
|
|
|
|
|
value['repo_name_full'] = repo_name_full
|
|
|
rename = old_data.get('repo_name') != repo_name_full
|
|
|
create = not edit
|
|
|
if rename or create:
|
|
|
|
|
|
if group_path != '':
|
|
|
if RepoModel().get_by_repo_name(repo_name_full,):
|
|
|
e_dict = {'repo_name':_('This repository already '
|
|
|
'exists in a group "%s"') %
|
|
|
gr.group_name}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
elif Group.get_by_group_name(repo_name_full):
|
|
|
e_dict = {'repo_name':_('There is a group with this'
|
|
|
' name already "%s"') %
|
|
|
repo_name_full}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
|
|
|
elif RepoModel().get_by_repo_name(repo_name_full):
|
|
|
e_dict = {'repo_name':_('This repository '
|
|
|
'already exists')}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
|
|
|
return value
|
|
|
|
|
|
return _ValidRepoName
|
|
|
|
|
|
def ValidForkName():
|
|
|
class _ValidForkName(formencode.validators.FancyValidator):
|
|
|
def to_python(self, value, state):
|
|
|
|
|
|
repo_name = value.get('fork_name')
|
|
|
|
|
|
slug = repo_name_slug(repo_name)
|
|
|
if slug in [ADMIN_PREFIX, '']:
|
|
|
e_dict = {'repo_name': _('This repository name is disallowed')}
|
|
|
raise formencode.Invalid('', value, state, error_dict=e_dict)
|
|
|
|
|
|
if RepoModel().get_by_repo_name(repo_name):
|
|
|
e_dict = {'fork_name':_('This repository '
|
|
|
'already exists')}
|
|
|
raise formencode.Invalid('', value, state,
|
|
|
error_dict=e_dict)
|
|
|
return value
|
|
|
return _ValidForkName
|
|
|
|
|
|
|
|
|
def SlugifyName():
|
|
|
class _SlugifyName(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
return repo_name_slug(value)
|
|
|
|
|
|
return _SlugifyName
|
|
|
|
|
|
def ValidCloneUri():
|
|
|
from mercurial.httprepo import httprepository, httpsrepository
|
|
|
from rhodecode.lib.utils import make_ui
|
|
|
|
|
|
class _ValidCloneUri(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
if not value:
|
|
|
pass
|
|
|
elif value.startswith('https'):
|
|
|
try:
|
|
|
httpsrepository(make_ui('db'), value).capabilities
|
|
|
except Exception, e:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise formencode.Invalid(_('invalid clone url'), value,
|
|
|
state)
|
|
|
elif value.startswith('http'):
|
|
|
try:
|
|
|
httprepository(make_ui('db'), value).capabilities
|
|
|
except Exception, e:
|
|
|
log.error(traceback.format_exc())
|
|
|
raise formencode.Invalid(_('invalid clone url'), value,
|
|
|
state)
|
|
|
else:
|
|
|
raise formencode.Invalid(_('Invalid clone url, provide a '
|
|
|
'valid clone http\s url'), value,
|
|
|
state)
|
|
|
return value
|
|
|
|
|
|
return _ValidCloneUri
|
|
|
|
|
|
def ValidForkType(old_data):
|
|
|
class _ValidForkType(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
if old_data['repo_type'] != value:
|
|
|
raise formencode.Invalid(_('Fork have to be the same '
|
|
|
'type as original'), value, state)
|
|
|
|
|
|
return value
|
|
|
return _ValidForkType
|
|
|
|
|
|
class ValidPerms(formencode.validators.FancyValidator):
|
|
|
messages = {'perm_new_member_name':_('This username or users group name'
|
|
|
' is not valid')}
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
perms_update = []
|
|
|
perms_new = []
|
|
|
#build a list of permission to update and new permission to create
|
|
|
for k, v in value.items():
|
|
|
#means new added member to permissions
|
|
|
if k.startswith('perm_new_member'):
|
|
|
new_perm = value.get('perm_new_member', False)
|
|
|
new_member = value.get('perm_new_member_name', False)
|
|
|
new_type = value.get('perm_new_member_type')
|
|
|
|
|
|
if new_member and new_perm:
|
|
|
if (new_member, new_perm, new_type) not in perms_new:
|
|
|
perms_new.append((new_member, new_perm, new_type))
|
|
|
elif k.startswith('u_perm_') or k.startswith('g_perm_'):
|
|
|
member = k[7:]
|
|
|
t = {'u':'user',
|
|
|
'g':'users_group'}[k[0]]
|
|
|
if member == 'default':
|
|
|
if value['private']:
|
|
|
#set none for default when updating to private repo
|
|
|
v = 'repository.none'
|
|
|
perms_update.append((member, v, t))
|
|
|
|
|
|
value['perms_updates'] = perms_update
|
|
|
value['perms_new'] = perms_new
|
|
|
|
|
|
#update permissions
|
|
|
for k, v, t in perms_new:
|
|
|
try:
|
|
|
if t is 'user':
|
|
|
self.user_db = User.query()\
|
|
|
.filter(User.active == True)\
|
|
|
.filter(User.username == k).one()
|
|
|
if t is 'users_group':
|
|
|
self.user_db = UsersGroup.query()\
|
|
|
.filter(UsersGroup.users_group_active == True)\
|
|
|
.filter(UsersGroup.users_group_name == k).one()
|
|
|
|
|
|
except Exception:
|
|
|
msg = self.message('perm_new_member_name',
|
|
|
state=State_obj)
|
|
|
raise formencode.Invalid(msg, value, state,
|
|
|
error_dict={'perm_new_member_name':msg})
|
|
|
return value
|
|
|
|
|
|
class ValidSettings(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
#settings form can't edit user
|
|
|
if value.has_key('user'):
|
|
|
del['value']['user']
|
|
|
|
|
|
return value
|
|
|
|
|
|
class ValidPath(formencode.validators.FancyValidator):
|
|
|
def to_python(self, value, state):
|
|
|
|
|
|
if not os.path.isdir(value):
|
|
|
msg = _('This is not a valid path')
|
|
|
raise formencode.Invalid(msg, value, state,
|
|
|
error_dict={'paths_root_path':msg})
|
|
|
return value
|
|
|
|
|
|
def UniqSystemEmail(old_data):
|
|
|
class _UniqSystemEmail(formencode.validators.FancyValidator):
|
|
|
def to_python(self, value, state):
|
|
|
value = value.lower()
|
|
|
if old_data.get('email') != value:
|
|
|
user = User.query().filter(User.email == value).scalar()
|
|
|
if user:
|
|
|
raise formencode.Invalid(
|
|
|
_("This e-mail address is already taken"),
|
|
|
value, state)
|
|
|
return value
|
|
|
|
|
|
return _UniqSystemEmail
|
|
|
|
|
|
class ValidSystemEmail(formencode.validators.FancyValidator):
|
|
|
def to_python(self, value, state):
|
|
|
value = value.lower()
|
|
|
user = User.query().filter(User.email == value).scalar()
|
|
|
if user is None:
|
|
|
raise formencode.Invalid(_("This e-mail address doesn't exist.") ,
|
|
|
value, state)
|
|
|
|
|
|
return value
|
|
|
|
|
|
class LdapLibValidator(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
|
|
|
try:
|
|
|
import ldap
|
|
|
except ImportError:
|
|
|
raise LdapImportError
|
|
|
return value
|
|
|
|
|
|
class AttrLoginValidator(formencode.validators.FancyValidator):
|
|
|
|
|
|
def to_python(self, value, state):
|
|
|
|
|
|
if not value or not isinstance(value, (str, unicode)):
|
|
|
raise formencode.Invalid(_("The LDAP Login attribute of the CN "
|
|
|
"must be specified - this is the name "
|
|
|
"of the attribute that is equivalent "
|
|
|
"to 'username'"),
|
|
|
value, state)
|
|
|
|
|
|
return value
|
|
|
|
|
|
#===============================================================================
|
|
|
# FORMS
|
|
|
#===============================================================================
|
|
|
class LoginForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
username = UnicodeString(
|
|
|
strip=True,
|
|
|
min=1,
|
|
|
not_empty=True,
|
|
|
messages={
|
|
|
'empty':_('Please enter a login'),
|
|
|
'tooShort':_('Enter a value %(min)i characters long or more')}
|
|
|
)
|
|
|
|
|
|
password = UnicodeString(
|
|
|
strip=True,
|
|
|
min=3,
|
|
|
not_empty=True,
|
|
|
messages={
|
|
|
'empty':_('Please enter a password'),
|
|
|
'tooShort':_('Enter %(min)i characters or more')}
|
|
|
)
|
|
|
|
|
|
chained_validators = [ValidAuth]
|
|
|
|
|
|
def UserForm(edit=False, old_data={}):
|
|
|
class _UserForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
username = All(UnicodeString(strip=True, min=1, not_empty=True),
|
|
|
ValidUsername(edit, old_data))
|
|
|
if edit:
|
|
|
new_password = All(UnicodeString(strip=True, min=6, not_empty=False))
|
|
|
password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=False))
|
|
|
admin = StringBoolean(if_missing=False)
|
|
|
else:
|
|
|
password = All(UnicodeString(strip=True, min=6, not_empty=True))
|
|
|
password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=False))
|
|
|
|
|
|
active = StringBoolean(if_missing=False)
|
|
|
name = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
lastname = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
email = All(Email(not_empty=True), UniqSystemEmail(old_data))
|
|
|
|
|
|
chained_validators = [ValidPasswordsMatch, ValidPassword]
|
|
|
|
|
|
return _UserForm
|
|
|
|
|
|
|
|
|
def UsersGroupForm(edit=False, old_data={}, available_members=[]):
|
|
|
class _UsersGroupForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
|
|
|
users_group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
|
|
|
ValidUsersGroup(edit, old_data))
|
|
|
|
|
|
users_group_active = StringBoolean(if_missing=False)
|
|
|
|
|
|
if edit:
|
|
|
users_group_members = OneOf(available_members, hideList=False,
|
|
|
testValueList=True,
|
|
|
if_missing=None, not_empty=False)
|
|
|
|
|
|
return _UsersGroupForm
|
|
|
|
|
|
def ReposGroupForm(edit=False, old_data={}, available_groups=[]):
|
|
|
class _ReposGroupForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
|
|
|
group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
|
|
|
SlugifyName())
|
|
|
group_description = UnicodeString(strip=True, min=1,
|
|
|
not_empty=True)
|
|
|
group_parent_id = OneOf(available_groups, hideList=False,
|
|
|
testValueList=True,
|
|
|
if_missing=None, not_empty=False)
|
|
|
|
|
|
chained_validators = [ValidReposGroup(edit, old_data)]
|
|
|
|
|
|
return _ReposGroupForm
|
|
|
|
|
|
def RegisterForm(edit=False, old_data={}):
|
|
|
class _RegisterForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
username = All(ValidUsername(edit, old_data),
|
|
|
UnicodeString(strip=True, min=1, not_empty=True))
|
|
|
password = All(UnicodeString(strip=True, min=6, not_empty=True))
|
|
|
password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=True))
|
|
|
active = StringBoolean(if_missing=False)
|
|
|
name = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
lastname = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
email = All(Email(not_empty=True), UniqSystemEmail(old_data))
|
|
|
|
|
|
chained_validators = [ValidPasswordsMatch, ValidPassword]
|
|
|
|
|
|
return _RegisterForm
|
|
|
|
|
|
def PasswordResetForm():
|
|
|
class _PasswordResetForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
email = All(ValidSystemEmail(), Email(not_empty=True))
|
|
|
return _PasswordResetForm
|
|
|
|
|
|
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
|
|
|
repo_groups=[]):
|
|
|
class _RepoForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = False
|
|
|
repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
|
|
|
SlugifyName())
|
|
|
clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False),
|
|
|
ValidCloneUri()())
|
|
|
repo_group = OneOf(repo_groups, hideList=True)
|
|
|
repo_type = OneOf(supported_backends)
|
|
|
description = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
private = StringBoolean(if_missing=False)
|
|
|
enable_statistics = StringBoolean(if_missing=False)
|
|
|
enable_downloads = StringBoolean(if_missing=False)
|
|
|
|
|
|
if edit:
|
|
|
#this is repo owner
|
|
|
user = All(UnicodeString(not_empty=True), ValidRepoUser)
|
|
|
|
|
|
chained_validators = [ValidRepoName(edit, old_data), ValidPerms]
|
|
|
return _RepoForm
|
|
|
|
|
|
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
|
|
|
class _RepoForkForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = False
|
|
|
fork_name = All(UnicodeString(strip=True, min=1, not_empty=True),
|
|
|
SlugifyName())
|
|
|
description = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
private = StringBoolean(if_missing=False)
|
|
|
repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
|
|
|
|
|
|
chained_validators = [ValidForkName()]
|
|
|
|
|
|
return _RepoForkForm
|
|
|
|
|
|
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
|
|
|
repo_groups=[]):
|
|
|
class _RepoForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = False
|
|
|
repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
|
|
|
SlugifyName())
|
|
|
description = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
repo_group = OneOf(repo_groups, hideList=True)
|
|
|
private = StringBoolean(if_missing=False)
|
|
|
|
|
|
chained_validators = [ValidRepoName(edit, old_data), ValidPerms,
|
|
|
ValidSettings]
|
|
|
return _RepoForm
|
|
|
|
|
|
|
|
|
def ApplicationSettingsForm():
|
|
|
class _ApplicationSettingsForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = False
|
|
|
rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
|
|
|
rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
|
|
|
|
|
|
return _ApplicationSettingsForm
|
|
|
|
|
|
def ApplicationUiSettingsForm():
|
|
|
class _ApplicationUiSettingsForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = False
|
|
|
web_push_ssl = OneOf(['true', 'false'], if_missing='false')
|
|
|
paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
|
|
|
hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
|
|
|
hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
|
|
|
hooks_pretxnchangegroup_push_logger = OneOf(['True', 'False'], if_missing=False)
|
|
|
hooks_preoutgoing_pull_logger = OneOf(['True', 'False'], if_missing=False)
|
|
|
|
|
|
return _ApplicationUiSettingsForm
|
|
|
|
|
|
def DefaultPermissionsForm(perms_choices, register_choices, create_choices):
|
|
|
class _DefaultPermissionsForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
overwrite_default = StringBoolean(if_missing=False)
|
|
|
anonymous = OneOf(['True', 'False'], if_missing=False)
|
|
|
default_perm = OneOf(perms_choices)
|
|
|
default_register = OneOf(register_choices)
|
|
|
default_create = OneOf(create_choices)
|
|
|
|
|
|
return _DefaultPermissionsForm
|
|
|
|
|
|
|
|
|
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices, tls_kind_choices):
|
|
|
class _LdapSettingsForm(formencode.Schema):
|
|
|
allow_extra_fields = True
|
|
|
filter_extra_fields = True
|
|
|
pre_validators = [LdapLibValidator]
|
|
|
ldap_active = StringBoolean(if_missing=False)
|
|
|
ldap_host = UnicodeString(strip=True,)
|
|
|
ldap_port = Number(strip=True,)
|
|
|
ldap_tls_kind = OneOf(tls_kind_choices)
|
|
|
ldap_tls_reqcert = OneOf(tls_reqcert_choices)
|
|
|
ldap_dn_user = UnicodeString(strip=True,)
|
|
|
ldap_dn_pass = UnicodeString(strip=True,)
|
|
|
ldap_base_dn = UnicodeString(strip=True,)
|
|
|
ldap_filter = UnicodeString(strip=True,)
|
|
|
ldap_search_scope = OneOf(search_scope_choices)
|
|
|
ldap_attr_login = All(AttrLoginValidator, UnicodeString(strip=True,))
|
|
|
ldap_attr_firstname = UnicodeString(strip=True,)
|
|
|
ldap_attr_lastname = UnicodeString(strip=True,)
|
|
|
ldap_attr_email = UnicodeString(strip=True,)
|
|
|
|
|
|
return _LdapSettingsForm
|
|
|
|