forms.py
449 lines
| 18.0 KiB
| text/x-python
|
PythonLexer
r547 | """ this is forms validation classes | |||
http://formencode.org/module-formencode.validators.html | ||||
for list off all availible validators | ||||
we can create our own validators | ||||
The table below outlines the options which can be used in a schema in addition to the validators themselves | ||||
pre_validators [] These validators will be applied before the schema | ||||
chained_validators [] These validators will be applied after the schema | ||||
allow_extra_fields False If True, then it is not an error when keys that aren't associated with a validator are present | ||||
filter_extra_fields False If True, then keys that aren't associated with a validator are removed | ||||
if_key_missing NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value. | ||||
ignore_key_missing False If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already | ||||
<name> = formencode.validators.<name of validator> | ||||
<name> must equal form name | ||||
list=[1,2,3,4,5] | ||||
for SELECT use formencode.All(OneOf(list), Int()) | ||||
""" | ||||
from formencode import All | ||||
from formencode.validators import UnicodeString, OneOf, Int, Number, Regex, \ | ||||
Email, Bool, StringBoolean | ||||
from pylons import session | ||||
from pylons.i18n.translation import _ | ||||
r699 | from rhodecode.lib.auth import authfunc, get_crypt_password | |||
r705 | from rhodecode.lib.exceptions import LdapImportError | |||
r547 | from rhodecode.model import meta | |||
r629 | from rhodecode.model.user import UserModel | |||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.db import User | ||||
r547 | from webhelpers.pylonslib.secure_form import authentication_token | |||
r710 | from rhodecode import BACKENDS | |||
r547 | import formencode | |||
import logging | ||||
import os | ||||
import rhodecode.lib.helpers as h | ||||
r629 | ||||
r547 | log = logging.getLogger(__name__) | |||
#this is needed to translate the messages using _() in validators | ||||
class State_obj(object): | ||||
_ = staticmethod(_) | ||||
r629 | ||||
r547 | #=============================================================================== | |||
# VALIDATORS | ||||
#=============================================================================== | ||||
class ValidAuthToken(formencode.validators.FancyValidator): | ||||
messages = {'invalid_token':_('Token mismatch')} | ||||
def validate_python(self, value, state): | ||||
if value != authentication_token(): | ||||
raise formencode.Invalid(self.message('invalid_token', state, | ||||
search_number=value), value, state) | ||||
r629 | ||||
def ValidUsername(edit, old_data): | ||||
r547 | class _ValidUsername(formencode.validators.FancyValidator): | |||
r629 | ||||
r547 | def validate_python(self, value, state): | |||
if value in ['default', 'new_user']: | ||||
raise formencode.Invalid(_('Invalid username'), value, state) | ||||
r629 | #check if user is unique | |||
r547 | old_un = None | |||
if edit: | ||||
r629 | old_un = UserModel().get(old_data.get('user_id')).username | |||
if old_un != value or not edit: | ||||
r742 | if UserModel().get_by_username(value, cache=False, | |||
case_insensitive=True): | ||||
r547 | raise formencode.Invalid(_('This username already exists') , | |||
value, state) | ||||
r629 | ||||
return _ValidUsername | ||||
r547 | class ValidPassword(formencode.validators.FancyValidator): | |||
r629 | ||||
r547 | def to_python(self, value, state): | |||
r722 | ||||
r547 | if value: | |||
r722 | ||||
if value.get('password'): | ||||
try: | ||||
value['password'] = get_crypt_password(value['password']) | ||||
except UnicodeEncodeError: | ||||
e_dict = {'password':_('Invalid characters in password')} | ||||
raise formencode.Invalid('', value, state, error_dict=e_dict) | ||||
if value.get('password_confirmation'): | ||||
try: | ||||
value['password_confirmation'] = \ | ||||
get_crypt_password(value['password_confirmation']) | ||||
except UnicodeEncodeError: | ||||
e_dict = {'password_confirmation':_('Invalid characters in password')} | ||||
raise formencode.Invalid('', value, state, error_dict=e_dict) | ||||
r728 | if value.get('new_password'): | |||
try: | ||||
value['new_password'] = \ | ||||
get_crypt_password(value['new_password']) | ||||
except UnicodeEncodeError: | ||||
e_dict = {'new_password':_('Invalid characters in password')} | ||||
raise formencode.Invalid('', value, state, error_dict=e_dict) | ||||
r722 | return value | |||
class ValidPasswordsMatch(formencode.validators.FancyValidator): | ||||
def validate_python(self, value, state): | ||||
if value['password'] != value['password_confirmation']: | ||||
e_dict = {'password_confirmation': | ||||
_('Password do not match')} | ||||
raise formencode.Invalid('', value, state, error_dict=e_dict) | ||||
r629 | ||||
r547 | class ValidAuth(formencode.validators.FancyValidator): | |||
messages = { | ||||
'invalid_password':_('invalid password'), | ||||
'invalid_login':_('invalid user name'), | ||||
r705 | 'disabled_account':_('Your account is disabled') | |||
r629 | ||||
r547 | } | |||
#error mapping | ||||
e_dict = {'username':messages['invalid_login'], | ||||
'password':messages['invalid_password']} | ||||
e_dict_disable = {'username':messages['disabled_account']} | ||||
r629 | ||||
r547 | def validate_python(self, value, state): | |||
password = value['password'] | ||||
username = value['username'] | ||||
r629 | user = UserModel().get_by_username(username) | |||
r699 | ||||
if authfunc(None, username, password): | ||||
return value | ||||
else: | ||||
if user and user.active is False: | ||||
r547 | log.warning('user %s is disabled', username) | |||
raise formencode.Invalid(self.message('disabled_account', | ||||
state=State_obj), | ||||
value, state, | ||||
error_dict=self.e_dict_disable) | ||||
r699 | else: | |||
log.warning('user %s not authenticated', username) | ||||
raise formencode.Invalid(self.message('invalid_password', | ||||
state=State_obj), value, state, | ||||
error_dict=self.e_dict) | ||||
r629 | ||||
r547 | class ValidRepoUser(formencode.validators.FancyValidator): | |||
r629 | ||||
r547 | def to_python(self, value, state): | |||
r629 | sa = meta.Session() | |||
r547 | try: | |||
r629 | self.user_db = sa.query(User)\ | |||
r547 | .filter(User.active == True)\ | |||
.filter(User.username == value).one() | ||||
except Exception: | ||||
raise formencode.Invalid(_('This username is not valid'), | ||||
value, state) | ||||
finally: | ||||
meta.Session.remove() | ||||
r629 | ||||
r547 | return self.user_db.user_id | |||
r629 | def ValidRepoName(edit, old_data): | |||
r547 | class _ValidRepoName(formencode.validators.FancyValidator): | |||
r629 | ||||
r547 | def to_python(self, value, state): | |||
slug = h.repo_name_slug(value) | ||||
if slug in ['_admin']: | ||||
raise formencode.Invalid(_('This repository name is disallowed'), | ||||
value, state) | ||||
r629 | if old_data.get('repo_name') != value or not edit: | |||
r735 | if RepoModel().get_by_repo_name(slug, cache=False): | |||
r547 | raise formencode.Invalid(_('This repository already exists') , | |||
value, state) | ||||
r629 | return slug | |||
r547 | return _ValidRepoName | |||
r659 | def ValidForkType(old_data): | |||
class _ValidForkType(formencode.validators.FancyValidator): | ||||
def to_python(self, value, state): | ||||
if old_data['repo_type'] != value: | ||||
r742 | raise formencode.Invalid(_('Fork have to be the same type as original'), | |||
value, state) | ||||
r659 | return value | |||
return _ValidForkType | ||||
r547 | class ValidPerms(formencode.validators.FancyValidator): | |||
messages = {'perm_new_user_name':_('This username is not valid')} | ||||
r629 | ||||
r547 | def to_python(self, value, state): | |||
perms_update = [] | ||||
perms_new = [] | ||||
#build a list of permission to update and new permission to create | ||||
for k, v in value.items(): | ||||
if k.startswith('perm_'): | ||||
if k.startswith('perm_new_user'): | ||||
new_perm = value.get('perm_new_user', False) | ||||
new_user = value.get('perm_new_user_name', False) | ||||
if new_user and new_perm: | ||||
if (new_user, new_perm) not in perms_new: | ||||
perms_new.append((new_user, new_perm)) | ||||
else: | ||||
r629 | usr = k[5:] | |||
r547 | if usr == 'default': | |||
if value['private']: | ||||
#set none for default when updating to private repo | ||||
v = 'repository.none' | ||||
perms_update.append((usr, v)) | ||||
value['perms_updates'] = perms_update | ||||
value['perms_new'] = perms_new | ||||
sa = meta.Session | ||||
for k, v in perms_new: | ||||
try: | ||||
self.user_db = sa.query(User)\ | ||||
.filter(User.active == True)\ | ||||
.filter(User.username == k).one() | ||||
except Exception: | ||||
msg = self.message('perm_new_user_name', | ||||
state=State_obj) | ||||
r742 | raise formencode.Invalid(msg, value, state, | |||
error_dict={'perm_new_user_name':msg}) | ||||
r547 | return value | |||
r629 | ||||
r547 | class ValidSettings(formencode.validators.FancyValidator): | |||
r629 | ||||
r547 | def to_python(self, value, state): | |||
#settings form can't edit user | ||||
if value.has_key('user'): | ||||
del['value']['user'] | ||||
r629 | ||||
r547 | return value | |||
r629 | ||||
r547 | class ValidPath(formencode.validators.FancyValidator): | |||
def to_python(self, value, state): | ||||
r631 | ||||
if not os.path.isdir(value): | ||||
r629 | msg = _('This is not a valid path') | |||
r631 | raise formencode.Invalid(msg, value, state, | |||
r629 | error_dict={'paths_root_path':msg}) | |||
r631 | return value | |||
r547 | ||||
def UniqSystemEmail(old_data): | ||||
class _UniqSystemEmail(formencode.validators.FancyValidator): | ||||
def to_python(self, value, state): | ||||
r741 | value = value.lower() | |||
#TODO:write test for MixedCase scenarios | ||||
r547 | if old_data.get('email') != value: | |||
r629 | sa = meta.Session() | |||
r547 | try: | |||
user = sa.query(User).filter(User.email == value).scalar() | ||||
if user: | ||||
raise formencode.Invalid(_("That e-mail address is already taken") , | ||||
value, state) | ||||
finally: | ||||
meta.Session.remove() | ||||
r629 | ||||
r547 | return value | |||
r629 | ||||
r547 | return _UniqSystemEmail | |||
r629 | ||||
r547 | class ValidSystemEmail(formencode.validators.FancyValidator): | |||
def to_python(self, value, state): | ||||
r741 | value = value.lower() | |||
r547 | sa = meta.Session | |||
try: | ||||
user = sa.query(User).filter(User.email == value).scalar() | ||||
if user is None: | ||||
raise formencode.Invalid(_("That e-mail address doesn't exist.") , | ||||
value, state) | ||||
finally: | ||||
meta.Session.remove() | ||||
r629 | ||||
return value | ||||
r547 | ||||
r705 | class LdapLibValidator(formencode.validators.FancyValidator): | |||
def to_python(self, value, state): | ||||
try: | ||||
import ldap | ||||
except ImportError: | ||||
raise LdapImportError | ||||
return value | ||||
r547 | #=============================================================================== | |||
# FORMS | ||||
#=============================================================================== | ||||
class LoginForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = True | ||||
username = UnicodeString( | ||||
strip=True, | ||||
min=1, | ||||
not_empty=True, | ||||
messages={ | ||||
'empty':_('Please enter a login'), | ||||
'tooShort':_('Enter a value %(min)i characters long or more')} | ||||
) | ||||
password = UnicodeString( | ||||
strip=True, | ||||
min=6, | ||||
not_empty=True, | ||||
messages={ | ||||
'empty':_('Please enter a password'), | ||||
r555 | 'tooShort':_('Enter %(min)i characters or more')} | |||
r547 | ) | |||
#chained validators have access to all data | ||||
chained_validators = [ValidAuth] | ||||
r629 | ||||
r547 | def UserForm(edit=False, old_data={}): | |||
class _UserForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = True | ||||
r742 | username = All(UnicodeString(strip=True, min=1, not_empty=True), | |||
ValidUsername(edit, old_data)) | ||||
r547 | if edit: | |||
r722 | new_password = All(UnicodeString(strip=True, min=6, not_empty=False)) | |||
r547 | admin = StringBoolean(if_missing=False) | |||
else: | ||||
r722 | password = All(UnicodeString(strip=True, min=6, not_empty=True)) | |||
r547 | active = StringBoolean(if_missing=False) | |||
name = UnicodeString(strip=True, min=1, not_empty=True) | ||||
lastname = UnicodeString(strip=True, min=1, not_empty=True) | ||||
email = All(Email(not_empty=True), UniqSystemEmail(old_data)) | ||||
r629 | ||||
r722 | chained_validators = [ValidPassword] | |||
r547 | return _UserForm | |||
r722 | def RegisterForm(edit=False, old_data={}): | |||
class _RegisterForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = True | ||||
r742 | username = All(ValidUsername(edit, old_data), | |||
UnicodeString(strip=True, min=1, not_empty=True)) | ||||
r722 | password = All(UnicodeString(strip=True, min=6, not_empty=True)) | |||
password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=True)) | ||||
active = StringBoolean(if_missing=False) | ||||
name = UnicodeString(strip=True, min=1, not_empty=True) | ||||
lastname = UnicodeString(strip=True, min=1, not_empty=True) | ||||
email = All(Email(not_empty=True), UniqSystemEmail(old_data)) | ||||
chained_validators = [ValidPasswordsMatch, ValidPassword] | ||||
return _RegisterForm | ||||
r547 | ||||
def PasswordResetForm(): | ||||
class _PasswordResetForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = True | ||||
r629 | email = All(ValidSystemEmail(), Email(not_empty=True)) | |||
r547 | return _PasswordResetForm | |||
r710 | def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()): | |||
r547 | class _RepoForm(formencode.Schema): | |||
allow_extra_fields = True | ||||
filter_extra_fields = False | ||||
r742 | repo_name = All(UnicodeString(strip=True, min=1, not_empty=True), | |||
ValidRepoName(edit, old_data)) | ||||
r547 | description = UnicodeString(strip=True, min=1, not_empty=True) | |||
private = StringBoolean(if_missing=False) | ||||
r710 | repo_type = OneOf(supported_backends) | |||
r547 | if edit: | |||
user = All(Int(not_empty=True), ValidRepoUser) | ||||
r629 | ||||
r547 | chained_validators = [ValidPerms] | |||
return _RepoForm | ||||
r710 | def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()): | |||
r547 | class _RepoForkForm(formencode.Schema): | |||
allow_extra_fields = True | ||||
filter_extra_fields = False | ||||
r742 | fork_name = All(UnicodeString(strip=True, min=1, not_empty=True), | |||
ValidRepoName(edit, old_data)) | ||||
r547 | description = UnicodeString(strip=True, min=1, not_empty=True) | |||
private = StringBoolean(if_missing=False) | ||||
r710 | repo_type = All(ValidForkType(old_data), OneOf(supported_backends)) | |||
r547 | return _RepoForkForm | |||
def RepoSettingsForm(edit=False, old_data={}): | ||||
class _RepoForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = False | ||||
r742 | repo_name = All(UnicodeString(strip=True, min=1, not_empty=True), | |||
ValidRepoName(edit, old_data)) | ||||
r547 | description = UnicodeString(strip=True, min=1, not_empty=True) | |||
private = StringBoolean(if_missing=False) | ||||
r629 | ||||
r547 | chained_validators = [ValidPerms, ValidSettings] | |||
return _RepoForm | ||||
def ApplicationSettingsForm(): | ||||
class _ApplicationSettingsForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = False | ||||
r548 | rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True) | |||
rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True) | ||||
r629 | ||||
r547 | return _ApplicationSettingsForm | |||
r629 | ||||
r547 | def ApplicationUiSettingsForm(): | |||
class _ApplicationUiSettingsForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = False | ||||
web_push_ssl = OneOf(['true', 'false'], if_missing='false') | ||||
paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True)) | ||||
hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False) | ||||
hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False) | ||||
r661 | hooks_pretxnchangegroup_push_logger = OneOf(['True', 'False'], if_missing=False) | |||
hooks_preoutgoing_pull_logger = OneOf(['True', 'False'], if_missing=False) | ||||
r629 | ||||
r547 | return _ApplicationUiSettingsForm | |||
def DefaultPermissionsForm(perms_choices, register_choices, create_choices): | ||||
class _DefaultPermissionsForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = True | ||||
r705 | overwrite_default = StringBoolean(if_missing=False) | |||
r673 | anonymous = OneOf(['True', 'False'], if_missing=False) | |||
r547 | default_perm = OneOf(perms_choices) | |||
default_register = OneOf(register_choices) | ||||
default_create = OneOf(create_choices) | ||||
r629 | ||||
r547 | return _DefaultPermissionsForm | |||
r705 | ||||
def LdapSettingsForm(): | ||||
class _LdapSettingsForm(formencode.Schema): | ||||
allow_extra_fields = True | ||||
filter_extra_fields = True | ||||
pre_validators = [LdapLibValidator] | ||||
ldap_active = StringBoolean(if_missing=False) | ||||
ldap_host = UnicodeString(strip=True,) | ||||
ldap_port = Number(strip=True,) | ||||
ldap_ldaps = StringBoolean(if_missing=False) | ||||
ldap_dn_user = UnicodeString(strip=True,) | ||||
ldap_dn_pass = UnicodeString(strip=True,) | ||||
ldap_base_dn = UnicodeString(strip=True,) | ||||
return _LdapSettingsForm | ||||