##// END OF EJS Templates
fixed ldap tests when ldap lib is installed
marcink -
r3674:ff2ea58d beta
parent child Browse files
Show More
@@ -1,387 +1,384 b''
1 1 """ this is forms validation classes
2 2 http://formencode.org/module-formencode.validators.html
3 3 for list off all availible validators
4 4
5 5 we can create our own validators
6 6
7 7 The table below outlines the options which can be used in a schema in addition to the validators themselves
8 8 pre_validators [] These validators will be applied before the schema
9 9 chained_validators [] These validators will be applied after the schema
10 10 allow_extra_fields False If True, then it is not an error when keys that aren't associated with a validator are present
11 11 filter_extra_fields False If True, then keys that aren't associated with a validator are removed
12 12 if_key_missing NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value.
13 13 ignore_key_missing False If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already
14 14
15 15
16 16 <name> = formencode.validators.<name of validator>
17 17 <name> must equal form name
18 18 list=[1,2,3,4,5]
19 19 for SELECT use formencode.All(OneOf(list), Int())
20 20
21 21 """
22 22 import logging
23 23
24 24 import formencode
25 25 from formencode import All
26 26
27 27 from pylons.i18n.translation import _
28 28
29 29 from rhodecode.model import validators as v
30 30 from rhodecode import BACKENDS
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34
35 35 class LoginForm(formencode.Schema):
36 36 allow_extra_fields = True
37 37 filter_extra_fields = True
38 38 username = v.UnicodeString(
39 39 strip=True,
40 40 min=1,
41 41 not_empty=True,
42 42 messages={
43 43 'empty': _(u'Please enter a login'),
44 44 'tooShort': _(u'Enter a value %(min)i characters long or more')}
45 45 )
46 46
47 47 password = v.UnicodeString(
48 48 strip=False,
49 49 min=3,
50 50 not_empty=True,
51 51 messages={
52 52 'empty': _(u'Please enter a password'),
53 53 'tooShort': _(u'Enter %(min)i characters or more')}
54 54 )
55 55
56 56 remember = v.StringBoolean(if_missing=False)
57 57
58 58 chained_validators = [v.ValidAuth()]
59 59
60 60
61 61 def UserForm(edit=False, old_data={}):
62 62 class _UserForm(formencode.Schema):
63 63 allow_extra_fields = True
64 64 filter_extra_fields = True
65 65 username = All(v.UnicodeString(strip=True, min=1, not_empty=True),
66 66 v.ValidUsername(edit, old_data))
67 67 if edit:
68 68 new_password = All(
69 69 v.ValidPassword(),
70 70 v.UnicodeString(strip=False, min=6, not_empty=False)
71 71 )
72 72 password_confirmation = All(
73 73 v.ValidPassword(),
74 74 v.UnicodeString(strip=False, min=6, not_empty=False),
75 75 )
76 76 admin = v.StringBoolean(if_missing=False)
77 77 else:
78 78 password = All(
79 79 v.ValidPassword(),
80 80 v.UnicodeString(strip=False, min=6, not_empty=True)
81 81 )
82 82 password_confirmation = All(
83 83 v.ValidPassword(),
84 84 v.UnicodeString(strip=False, min=6, not_empty=False)
85 85 )
86 86
87 87 active = v.StringBoolean(if_missing=False)
88 88 firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
89 89 lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
90 90 email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
91 91
92 92 chained_validators = [v.ValidPasswordsMatch()]
93 93
94 94 return _UserForm
95 95
96 96
97 97 def UserGroupForm(edit=False, old_data={}, available_members=[]):
98 98 class _UserGroupForm(formencode.Schema):
99 99 allow_extra_fields = True
100 100 filter_extra_fields = True
101 101
102 102 users_group_name = All(
103 103 v.UnicodeString(strip=True, min=1, not_empty=True),
104 104 v.ValidUserGroup(edit, old_data)
105 105 )
106 106
107 107 users_group_active = v.StringBoolean(if_missing=False)
108 108
109 109 if edit:
110 110 users_group_members = v.OneOf(
111 111 available_members, hideList=False, testValueList=True,
112 112 if_missing=None, not_empty=False
113 113 )
114 114
115 115 return _UserGroupForm
116 116
117 117
118 118 def ReposGroupForm(edit=False, old_data={}, available_groups=[],
119 119 can_create_in_root=False):
120 120 class _ReposGroupForm(formencode.Schema):
121 121 allow_extra_fields = True
122 122 filter_extra_fields = False
123 123
124 124 group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
125 125 v.SlugifyName())
126 126 group_description = v.UnicodeString(strip=True, min=1,
127 127 not_empty=False)
128 128 if edit:
129 129 #FIXME: do a special check that we cannot move a group to one of
130 130 #it's children
131 131 pass
132 132 group_parent_id = All(v.CanCreateGroup(can_create_in_root),
133 133 v.OneOf(available_groups, hideList=False,
134 134 testValueList=True,
135 135 if_missing=None, not_empty=True))
136 136 enable_locking = v.StringBoolean(if_missing=False)
137 137 recursive = v.StringBoolean(if_missing=False)
138 138 chained_validators = [v.ValidReposGroup(edit, old_data),
139 139 v.ValidPerms('group')]
140 140
141 141 return _ReposGroupForm
142 142
143 143
144 144 def RegisterForm(edit=False, old_data={}):
145 145 class _RegisterForm(formencode.Schema):
146 146 allow_extra_fields = True
147 147 filter_extra_fields = True
148 148 username = All(
149 149 v.ValidUsername(edit, old_data),
150 150 v.UnicodeString(strip=True, min=1, not_empty=True)
151 151 )
152 152 password = All(
153 153 v.ValidPassword(),
154 154 v.UnicodeString(strip=False, min=6, not_empty=True)
155 155 )
156 156 password_confirmation = All(
157 157 v.ValidPassword(),
158 158 v.UnicodeString(strip=False, min=6, not_empty=True)
159 159 )
160 160 active = v.StringBoolean(if_missing=False)
161 161 firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
162 162 lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
163 163 email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
164 164
165 165 chained_validators = [v.ValidPasswordsMatch()]
166 166
167 167 return _RegisterForm
168 168
169 169
170 170 def PasswordResetForm():
171 171 class _PasswordResetForm(formencode.Schema):
172 172 allow_extra_fields = True
173 173 filter_extra_fields = True
174 174 email = All(v.ValidSystemEmail(), v.Email(not_empty=True))
175 175 return _PasswordResetForm
176 176
177 177
178 178 def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
179 179 repo_groups=[], landing_revs=[]):
180 180 class _RepoForm(formencode.Schema):
181 181 allow_extra_fields = True
182 182 filter_extra_fields = False
183 183 repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
184 184 v.SlugifyName())
185 185 repo_group = All(v.CanWriteGroup(old_data),
186 186 v.OneOf(repo_groups, hideList=True))
187 187 repo_type = v.OneOf(supported_backends)
188 188 repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
189 189 repo_private = v.StringBoolean(if_missing=False)
190 190 repo_landing_rev = v.OneOf(landing_revs, hideList=True)
191 191 clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
192 192
193 193 repo_enable_statistics = v.StringBoolean(if_missing=False)
194 194 repo_enable_downloads = v.StringBoolean(if_missing=False)
195 195 repo_enable_locking = v.StringBoolean(if_missing=False)
196 196
197 197 if edit:
198 198 #this is repo owner
199 199 user = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
200 200
201 201 chained_validators = [v.ValidCloneUri(),
202 202 v.ValidRepoName(edit, old_data)]
203 203 return _RepoForm
204 204
205 205
206 206 def RepoPermsForm():
207 207 class _RepoPermsForm(formencode.Schema):
208 208 allow_extra_fields = True
209 209 filter_extra_fields = False
210 210 chained_validators = [v.ValidPerms()]
211 211 return _RepoPermsForm
212 212
213 213
214 214 def RepoFieldForm():
215 215 class _RepoFieldForm(formencode.Schema):
216 216 filter_extra_fields = True
217 217 allow_extra_fields = True
218 218
219 219 new_field_key = All(v.FieldKey(),
220 220 v.UnicodeString(strip=True, min=3, not_empty=True))
221 221 new_field_value = v.UnicodeString(not_empty=False, if_missing='')
222 222 new_field_type = v.OneOf(['str', 'unicode', 'list', 'tuple'],
223 223 if_missing='str')
224 224 new_field_label = v.UnicodeString(not_empty=False)
225 225 new_field_desc = v.UnicodeString(not_empty=False)
226 226
227 227 return _RepoFieldForm
228 228
229 229
230 230 def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
231 231 repo_groups=[], landing_revs=[]):
232 232 class _RepoForkForm(formencode.Schema):
233 233 allow_extra_fields = True
234 234 filter_extra_fields = False
235 235 repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
236 236 v.SlugifyName())
237 237 repo_group = All(v.CanWriteGroup(),
238 238 v.OneOf(repo_groups, hideList=True))
239 239 repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
240 240 description = v.UnicodeString(strip=True, min=1, not_empty=True)
241 241 private = v.StringBoolean(if_missing=False)
242 242 copy_permissions = v.StringBoolean(if_missing=False)
243 243 update_after_clone = v.StringBoolean(if_missing=False)
244 244 fork_parent_id = v.UnicodeString()
245 245 chained_validators = [v.ValidForkName(edit, old_data)]
246 246 landing_rev = v.OneOf(landing_revs, hideList=True)
247 247
248 248 return _RepoForkForm
249 249
250 250
251 251 def ApplicationSettingsForm():
252 252 class _ApplicationSettingsForm(formencode.Schema):
253 253 allow_extra_fields = True
254 254 filter_extra_fields = False
255 255 rhodecode_title = v.UnicodeString(strip=True, min=1, not_empty=True)
256 256 rhodecode_realm = v.UnicodeString(strip=True, min=1, not_empty=True)
257 257 rhodecode_ga_code = v.UnicodeString(strip=True, min=1, not_empty=False)
258 258
259 259 return _ApplicationSettingsForm
260 260
261 261
262 262 def ApplicationVisualisationForm():
263 263 class _ApplicationVisualisationForm(formencode.Schema):
264 264 allow_extra_fields = True
265 265 filter_extra_fields = False
266 266 rhodecode_show_public_icon = v.StringBoolean(if_missing=False)
267 267 rhodecode_show_private_icon = v.StringBoolean(if_missing=False)
268 268 rhodecode_stylify_metatags = v.StringBoolean(if_missing=False)
269 269
270 270 rhodecode_lightweight_dashboard = v.StringBoolean(if_missing=False)
271 271 rhodecode_repository_fields = v.StringBoolean(if_missing=False)
272 272 rhodecode_lightweight_journal = v.StringBoolean(if_missing=False)
273 273
274 274 return _ApplicationVisualisationForm
275 275
276 276
277 277 def ApplicationUiSettingsForm():
278 278 class _ApplicationUiSettingsForm(formencode.Schema):
279 279 allow_extra_fields = True
280 280 filter_extra_fields = False
281 281 web_push_ssl = v.StringBoolean(if_missing=False)
282 282 paths_root_path = All(
283 283 v.ValidPath(),
284 284 v.UnicodeString(strip=True, min=1, not_empty=True)
285 285 )
286 286 hooks_changegroup_update = v.StringBoolean(if_missing=False)
287 287 hooks_changegroup_repo_size = v.StringBoolean(if_missing=False)
288 288 hooks_changegroup_push_logger = v.StringBoolean(if_missing=False)
289 289 hooks_outgoing_pull_logger = v.StringBoolean(if_missing=False)
290 290
291 291 extensions_largefiles = v.StringBoolean(if_missing=False)
292 292 extensions_hgsubversion = v.StringBoolean(if_missing=False)
293 293 extensions_hggit = v.StringBoolean(if_missing=False)
294 294
295 295 return _ApplicationUiSettingsForm
296 296
297 297
298 298 def DefaultPermissionsForm(repo_perms_choices, group_perms_choices,
299 299 register_choices, create_choices, fork_choices):
300 300 class _DefaultPermissionsForm(formencode.Schema):
301 301 allow_extra_fields = True
302 302 filter_extra_fields = True
303 303 overwrite_default_repo = v.StringBoolean(if_missing=False)
304 304 overwrite_default_group = v.StringBoolean(if_missing=False)
305 305 anonymous = v.StringBoolean(if_missing=False)
306 306 default_repo_perm = v.OneOf(repo_perms_choices)
307 307 default_group_perm = v.OneOf(group_perms_choices)
308 308 default_register = v.OneOf(register_choices)
309 309 default_create = v.OneOf(create_choices)
310 310 default_fork = v.OneOf(fork_choices)
311 311
312 312 return _DefaultPermissionsForm
313 313
314 314
315 315 def DefaultsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
316 316 class _DefaultsForm(formencode.Schema):
317 317 allow_extra_fields = True
318 318 filter_extra_fields = True
319 319 default_repo_type = v.OneOf(supported_backends)
320 320 default_repo_private = v.StringBoolean(if_missing=False)
321 321 default_repo_enable_statistics = v.StringBoolean(if_missing=False)
322 322 default_repo_enable_downloads = v.StringBoolean(if_missing=False)
323 323 default_repo_enable_locking = v.StringBoolean(if_missing=False)
324 324
325 325 return _DefaultsForm
326 326
327 327
328 328 def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
329 329 tls_kind_choices):
330 330 class _LdapSettingsForm(formencode.Schema):
331 331 allow_extra_fields = True
332 332 filter_extra_fields = True
333 333 #pre_validators = [LdapLibValidator]
334 334 ldap_active = v.StringBoolean(if_missing=False)
335 335 ldap_host = v.UnicodeString(strip=True,)
336 336 ldap_port = v.Number(strip=True,)
337 337 ldap_tls_kind = v.OneOf(tls_kind_choices)
338 338 ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
339 339 ldap_dn_user = v.UnicodeString(strip=True,)
340 340 ldap_dn_pass = v.UnicodeString(strip=True,)
341 341 ldap_base_dn = v.UnicodeString(strip=True,)
342 342 ldap_filter = v.UnicodeString(strip=True,)
343 343 ldap_search_scope = v.OneOf(search_scope_choices)
344 ldap_attr_login = All(
345 v.AttrLoginValidator(),
346 v.UnicodeString(strip=True,)
347 )
344 ldap_attr_login = v.AttrLoginValidator()(not_empty=True)
348 345 ldap_attr_firstname = v.UnicodeString(strip=True,)
349 346 ldap_attr_lastname = v.UnicodeString(strip=True,)
350 347 ldap_attr_email = v.UnicodeString(strip=True,)
351 348
352 349 return _LdapSettingsForm
353 350
354 351
355 352 def UserExtraEmailForm():
356 353 class _UserExtraEmailForm(formencode.Schema):
357 354 email = All(v.UniqSystemEmail(), v.Email(not_empty=True))
358 355 return _UserExtraEmailForm
359 356
360 357
361 358 def UserExtraIpForm():
362 359 class _UserExtraIpForm(formencode.Schema):
363 360 ip = v.ValidIp()(not_empty=True)
364 361 return _UserExtraIpForm
365 362
366 363
367 364 def PullRequestForm(repo_id):
368 365 class _PullRequestForm(formencode.Schema):
369 366 allow_extra_fields = True
370 367 filter_extra_fields = True
371 368
372 369 user = v.UnicodeString(strip=True, required=True)
373 370 org_repo = v.UnicodeString(strip=True, required=True)
374 371 org_ref = v.UnicodeString(strip=True, required=True)
375 372 other_repo = v.UnicodeString(strip=True, required=True)
376 373 other_ref = v.UnicodeString(strip=True, required=True)
377 374 revisions = All(#v.NotReviewedRevisions(repo_id)(),
378 375 v.UniqueList(not_empty=True))
379 376 review_members = v.UniqueList(not_empty=True)
380 377
381 378 pullrequest_title = v.UnicodeString(strip=True, required=True, min=3)
382 379 pullrequest_desc = v.UnicodeString(strip=True, required=False)
383 380
384 381 ancestor_rev = v.UnicodeString(strip=True, required=True)
385 382 merge_rev = v.UnicodeString(strip=True, required=True)
386 383
387 384 return _PullRequestForm
@@ -1,813 +1,807 b''
1 1 """
2 2 Set of generic validators
3 3 """
4 4 import os
5 5 import re
6 6 import formencode
7 7 import logging
8 8 from collections import defaultdict
9 9 from pylons.i18n.translation import _
10 10 from webhelpers.pylonslib.secure_form import authentication_token
11 11
12 12 from formencode.validators import (
13 13 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
14 14 NotEmpty, IPAddress, CIDR
15 15 )
16 16 from rhodecode.lib.compat import OrderedSet
17 17 from rhodecode.lib import ipaddr
18 18 from rhodecode.lib.utils import repo_name_slug
19 19 from rhodecode.lib.utils2 import safe_int, str2bool
20 20 from rhodecode.model.db import RepoGroup, Repository, UserGroup, User,\
21 21 ChangesetStatus
22 22 from rhodecode.lib.exceptions import LdapImportError
23 23 from rhodecode.config.routing import ADMIN_PREFIX
24 24 from rhodecode.lib.auth import HasReposGroupPermissionAny, HasPermissionAny
25 25
26 26 # silence warnings and pylint
27 27 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
28 28 NotEmpty, IPAddress, CIDR
29 29
30 30 log = logging.getLogger(__name__)
31 31
32 32
33 33 class UniqueList(formencode.FancyValidator):
34 34 """
35 35 Unique List !
36 36 """
37 37 messages = dict(
38 38 empty=_('Value cannot be an empty list'),
39 39 missing_value=_('Value cannot be an empty list'),
40 40 )
41 41
42 42 def _to_python(self, value, state):
43 43 if isinstance(value, list):
44 44 return value
45 45 elif isinstance(value, set):
46 46 return list(value)
47 47 elif isinstance(value, tuple):
48 48 return list(value)
49 49 elif value is None:
50 50 return []
51 51 else:
52 52 return [value]
53 53
54 54 def empty_value(self, value):
55 55 return []
56 56
57 57
58 58 class StateObj(object):
59 59 """
60 60 this is needed to translate the messages using _() in validators
61 61 """
62 62 _ = staticmethod(_)
63 63
64 64
65 65 def M(self, key, state=None, **kwargs):
66 66 """
67 67 returns string from self.message based on given key,
68 68 passed kw params are used to substitute %(named)s params inside
69 69 translated strings
70 70
71 71 :param msg:
72 72 :param state:
73 73 """
74 74 if state is None:
75 75 state = StateObj()
76 76 else:
77 77 state._ = staticmethod(_)
78 78 #inject validator into state object
79 79 return self.message(key, state, **kwargs)
80 80
81 81
82 82 def ValidUsername(edit=False, old_data={}):
83 83 class _validator(formencode.validators.FancyValidator):
84 84 messages = {
85 85 'username_exists': _(u'Username "%(username)s" already exists'),
86 86 'system_invalid_username':
87 87 _(u'Username "%(username)s" is forbidden'),
88 88 'invalid_username':
89 89 _(u'Username may only contain alphanumeric characters '
90 90 'underscores, periods or dashes and must begin with '
91 91 'alphanumeric character')
92 92 }
93 93
94 94 def validate_python(self, value, state):
95 95 if value in ['default', 'new_user']:
96 96 msg = M(self, 'system_invalid_username', state, username=value)
97 97 raise formencode.Invalid(msg, value, state)
98 98 #check if user is unique
99 99 old_un = None
100 100 if edit:
101 101 old_un = User.get(old_data.get('user_id')).username
102 102
103 103 if old_un != value or not edit:
104 104 if User.get_by_username(value, case_insensitive=True):
105 105 msg = M(self, 'username_exists', state, username=value)
106 106 raise formencode.Invalid(msg, value, state)
107 107
108 108 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
109 109 msg = M(self, 'invalid_username', state)
110 110 raise formencode.Invalid(msg, value, state)
111 111 return _validator
112 112
113 113
114 114 def ValidRepoUser():
115 115 class _validator(formencode.validators.FancyValidator):
116 116 messages = {
117 117 'invalid_username': _(u'Username %(username)s is not valid')
118 118 }
119 119
120 120 def validate_python(self, value, state):
121 121 try:
122 122 User.query().filter(User.active == True)\
123 123 .filter(User.username == value).one()
124 124 except Exception:
125 125 msg = M(self, 'invalid_username', state, username=value)
126 126 raise formencode.Invalid(msg, value, state,
127 127 error_dict=dict(username=msg)
128 128 )
129 129
130 130 return _validator
131 131
132 132
133 133 def ValidUserGroup(edit=False, old_data={}):
134 134 class _validator(formencode.validators.FancyValidator):
135 135 messages = {
136 136 'invalid_group': _(u'Invalid user group name'),
137 137 'group_exist': _(u'User group "%(usergroup)s" already exists'),
138 138 'invalid_usergroup_name':
139 139 _(u'user group name may only contain alphanumeric '
140 140 'characters underscores, periods or dashes and must begin '
141 141 'with alphanumeric character')
142 142 }
143 143
144 144 def validate_python(self, value, state):
145 145 if value in ['default']:
146 146 msg = M(self, 'invalid_group', state)
147 147 raise formencode.Invalid(msg, value, state,
148 148 error_dict=dict(users_group_name=msg)
149 149 )
150 150 #check if group is unique
151 151 old_ugname = None
152 152 if edit:
153 153 old_id = old_data.get('users_group_id')
154 154 old_ugname = UserGroup.get(old_id).users_group_name
155 155
156 156 if old_ugname != value or not edit:
157 157 is_existing_group = UserGroup.get_by_group_name(value,
158 158 case_insensitive=True)
159 159 if is_existing_group:
160 160 msg = M(self, 'group_exist', state, usergroup=value)
161 161 raise formencode.Invalid(msg, value, state,
162 162 error_dict=dict(users_group_name=msg)
163 163 )
164 164
165 165 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
166 166 msg = M(self, 'invalid_usergroup_name', state)
167 167 raise formencode.Invalid(msg, value, state,
168 168 error_dict=dict(users_group_name=msg)
169 169 )
170 170
171 171 return _validator
172 172
173 173
174 174 def ValidReposGroup(edit=False, old_data={}):
175 175 class _validator(formencode.validators.FancyValidator):
176 176 messages = {
177 177 'group_parent_id': _(u'Cannot assign this group as parent'),
178 178 'group_exists': _(u'Group "%(group_name)s" already exists'),
179 179 'repo_exists':
180 180 _(u'Repository with name "%(group_name)s" already exists')
181 181 }
182 182
183 183 def validate_python(self, value, state):
184 184 # TODO WRITE VALIDATIONS
185 185 group_name = value.get('group_name')
186 186 group_parent_id = value.get('group_parent_id')
187 187
188 188 # slugify repo group just in case :)
189 189 slug = repo_name_slug(group_name)
190 190
191 191 # check for parent of self
192 192 parent_of_self = lambda: (
193 193 old_data['group_id'] == int(group_parent_id)
194 194 if group_parent_id else False
195 195 )
196 196 if edit and parent_of_self():
197 197 msg = M(self, 'group_parent_id', state)
198 198 raise formencode.Invalid(msg, value, state,
199 199 error_dict=dict(group_parent_id=msg)
200 200 )
201 201
202 202 old_gname = None
203 203 if edit:
204 204 old_gname = RepoGroup.get(old_data.get('group_id')).group_name
205 205
206 206 if old_gname != group_name or not edit:
207 207
208 208 # check group
209 209 gr = RepoGroup.query()\
210 210 .filter(RepoGroup.group_name == slug)\
211 211 .filter(RepoGroup.group_parent_id == group_parent_id)\
212 212 .scalar()
213 213
214 214 if gr:
215 215 msg = M(self, 'group_exists', state, group_name=slug)
216 216 raise formencode.Invalid(msg, value, state,
217 217 error_dict=dict(group_name=msg)
218 218 )
219 219
220 220 # check for same repo
221 221 repo = Repository.query()\
222 222 .filter(Repository.repo_name == slug)\
223 223 .scalar()
224 224
225 225 if repo:
226 226 msg = M(self, 'repo_exists', state, group_name=slug)
227 227 raise formencode.Invalid(msg, value, state,
228 228 error_dict=dict(group_name=msg)
229 229 )
230 230
231 231 return _validator
232 232
233 233
234 234 def ValidPassword():
235 235 class _validator(formencode.validators.FancyValidator):
236 236 messages = {
237 237 'invalid_password':
238 238 _(u'Invalid characters (non-ascii) in password')
239 239 }
240 240
241 241 def validate_python(self, value, state):
242 242 try:
243 243 (value or '').decode('ascii')
244 244 except UnicodeError:
245 245 msg = M(self, 'invalid_password', state)
246 246 raise formencode.Invalid(msg, value, state,)
247 247 return _validator
248 248
249 249
250 250 def ValidPasswordsMatch():
251 251 class _validator(formencode.validators.FancyValidator):
252 252 messages = {
253 253 'password_mismatch': _(u'Passwords do not match'),
254 254 }
255 255
256 256 def validate_python(self, value, state):
257 257
258 258 pass_val = value.get('password') or value.get('new_password')
259 259 if pass_val != value['password_confirmation']:
260 260 msg = M(self, 'password_mismatch', state)
261 261 raise formencode.Invalid(msg, value, state,
262 262 error_dict=dict(password_confirmation=msg)
263 263 )
264 264 return _validator
265 265
266 266
267 267 def ValidAuth():
268 268 class _validator(formencode.validators.FancyValidator):
269 269 messages = {
270 270 'invalid_password': _(u'invalid password'),
271 271 'invalid_username': _(u'invalid user name'),
272 272 'disabled_account': _(u'Your account is disabled')
273 273 }
274 274
275 275 def validate_python(self, value, state):
276 276 from rhodecode.lib.auth import authenticate
277 277
278 278 password = value['password']
279 279 username = value['username']
280 280
281 281 if not authenticate(username, password):
282 282 user = User.get_by_username(username)
283 283 if user and not user.active:
284 284 log.warning('user %s is disabled' % username)
285 285 msg = M(self, 'disabled_account', state)
286 286 raise formencode.Invalid(msg, value, state,
287 287 error_dict=dict(username=msg)
288 288 )
289 289 else:
290 290 log.warning('user %s failed to authenticate' % username)
291 291 msg = M(self, 'invalid_username', state)
292 292 msg2 = M(self, 'invalid_password', state)
293 293 raise formencode.Invalid(msg, value, state,
294 294 error_dict=dict(username=msg, password=msg2)
295 295 )
296 296 return _validator
297 297
298 298
299 299 def ValidAuthToken():
300 300 class _validator(formencode.validators.FancyValidator):
301 301 messages = {
302 302 'invalid_token': _(u'Token mismatch')
303 303 }
304 304
305 305 def validate_python(self, value, state):
306 306 if value != authentication_token():
307 307 msg = M(self, 'invalid_token', state)
308 308 raise formencode.Invalid(msg, value, state)
309 309 return _validator
310 310
311 311
312 312 def ValidRepoName(edit=False, old_data={}):
313 313 class _validator(formencode.validators.FancyValidator):
314 314 messages = {
315 315 'invalid_repo_name':
316 316 _(u'Repository name %(repo)s is disallowed'),
317 317 'repository_exists':
318 318 _(u'Repository named %(repo)s already exists'),
319 319 'repository_in_group_exists': _(u'Repository "%(repo)s" already '
320 320 'exists in group "%(group)s"'),
321 321 'same_group_exists': _(u'Repository group with name "%(repo)s" '
322 322 'already exists')
323 323 }
324 324
325 325 def _to_python(self, value, state):
326 326 repo_name = repo_name_slug(value.get('repo_name', ''))
327 327 repo_group = value.get('repo_group')
328 328 if repo_group:
329 329 gr = RepoGroup.get(repo_group)
330 330 group_path = gr.full_path
331 331 group_name = gr.group_name
332 332 # value needs to be aware of group name in order to check
333 333 # db key This is an actual just the name to store in the
334 334 # database
335 335 repo_name_full = group_path + RepoGroup.url_sep() + repo_name
336 336 else:
337 337 group_name = group_path = ''
338 338 repo_name_full = repo_name
339 339
340 340 value['repo_name'] = repo_name
341 341 value['repo_name_full'] = repo_name_full
342 342 value['group_path'] = group_path
343 343 value['group_name'] = group_name
344 344 return value
345 345
346 346 def validate_python(self, value, state):
347 347
348 348 repo_name = value.get('repo_name')
349 349 repo_name_full = value.get('repo_name_full')
350 350 group_path = value.get('group_path')
351 351 group_name = value.get('group_name')
352 352
353 353 if repo_name in [ADMIN_PREFIX, '']:
354 354 msg = M(self, 'invalid_repo_name', state, repo=repo_name)
355 355 raise formencode.Invalid(msg, value, state,
356 356 error_dict=dict(repo_name=msg)
357 357 )
358 358
359 359 rename = old_data.get('repo_name') != repo_name_full
360 360 create = not edit
361 361 if rename or create:
362 362
363 363 if group_path != '':
364 364 if Repository.get_by_repo_name(repo_name_full):
365 365 msg = M(self, 'repository_in_group_exists', state,
366 366 repo=repo_name, group=group_name)
367 367 raise formencode.Invalid(msg, value, state,
368 368 error_dict=dict(repo_name=msg)
369 369 )
370 370 elif RepoGroup.get_by_group_name(repo_name_full):
371 371 msg = M(self, 'same_group_exists', state,
372 372 repo=repo_name)
373 373 raise formencode.Invalid(msg, value, state,
374 374 error_dict=dict(repo_name=msg)
375 375 )
376 376
377 377 elif Repository.get_by_repo_name(repo_name_full):
378 378 msg = M(self, 'repository_exists', state,
379 379 repo=repo_name)
380 380 raise formencode.Invalid(msg, value, state,
381 381 error_dict=dict(repo_name=msg)
382 382 )
383 383 return value
384 384 return _validator
385 385
386 386
387 387 def ValidForkName(*args, **kwargs):
388 388 return ValidRepoName(*args, **kwargs)
389 389
390 390
391 391 def SlugifyName():
392 392 class _validator(formencode.validators.FancyValidator):
393 393
394 394 def _to_python(self, value, state):
395 395 return repo_name_slug(value)
396 396
397 397 def validate_python(self, value, state):
398 398 pass
399 399
400 400 return _validator
401 401
402 402
403 403 def ValidCloneUri():
404 404 from rhodecode.lib.utils import make_ui
405 405
406 406 def url_handler(repo_type, url, ui=None):
407 407 if repo_type == 'hg':
408 408 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
409 409 from mercurial.httppeer import httppeer
410 410 if url.startswith('http'):
411 411 ## initially check if it's at least the proper URL
412 412 ## or does it pass basic auth
413 413 MercurialRepository._check_url(url)
414 414 httppeer(ui, url)._capabilities()
415 415 elif url.startswith('svn+http'):
416 416 from hgsubversion.svnrepo import svnremoterepo
417 417 svnremoterepo(ui, url).capabilities
418 418 elif url.startswith('git+http'):
419 419 raise NotImplementedError()
420 420 else:
421 421 raise Exception('clone from URI %s not allowed' % (url))
422 422
423 423 elif repo_type == 'git':
424 424 from rhodecode.lib.vcs.backends.git.repository import GitRepository
425 425 if url.startswith('http'):
426 426 ## initially check if it's at least the proper URL
427 427 ## or does it pass basic auth
428 428 GitRepository._check_url(url)
429 429 elif url.startswith('svn+http'):
430 430 raise NotImplementedError()
431 431 elif url.startswith('hg+http'):
432 432 raise NotImplementedError()
433 433 else:
434 434 raise Exception('clone from URI %s not allowed' % (url))
435 435
436 436 class _validator(formencode.validators.FancyValidator):
437 437 messages = {
438 438 'clone_uri': _(u'invalid clone url'),
439 439 'invalid_clone_uri': _(u'Invalid clone url, provide a '
440 440 'valid clone http(s)/svn+http(s) url')
441 441 }
442 442
443 443 def validate_python(self, value, state):
444 444 repo_type = value.get('repo_type')
445 445 url = value.get('clone_uri')
446 446
447 447 if not url:
448 448 pass
449 449 else:
450 450 try:
451 451 url_handler(repo_type, url, make_ui('db', clear_session=False))
452 452 except Exception:
453 453 log.exception('Url validation failed')
454 454 msg = M(self, 'clone_uri')
455 455 raise formencode.Invalid(msg, value, state,
456 456 error_dict=dict(clone_uri=msg)
457 457 )
458 458 return _validator
459 459
460 460
461 461 def ValidForkType(old_data={}):
462 462 class _validator(formencode.validators.FancyValidator):
463 463 messages = {
464 464 'invalid_fork_type': _(u'Fork have to be the same type as parent')
465 465 }
466 466
467 467 def validate_python(self, value, state):
468 468 if old_data['repo_type'] != value:
469 469 msg = M(self, 'invalid_fork_type', state)
470 470 raise formencode.Invalid(msg, value, state,
471 471 error_dict=dict(repo_type=msg)
472 472 )
473 473 return _validator
474 474
475 475
476 476 def CanWriteGroup(old_data=None):
477 477 class _validator(formencode.validators.FancyValidator):
478 478 messages = {
479 479 'permission_denied': _(u"You don't have permissions "
480 480 "to create repository in this group"),
481 481 'permission_denied_root': _(u"no permission to create repository "
482 482 "in root location")
483 483 }
484 484
485 485 def _to_python(self, value, state):
486 486 #root location
487 487 if value in [-1, "-1"]:
488 488 return None
489 489 return value
490 490
491 491 def validate_python(self, value, state):
492 492 gr = RepoGroup.get(value)
493 493 gr_name = gr.group_name if gr else None # None means ROOT location
494 494 val = HasReposGroupPermissionAny('group.write', 'group.admin')
495 495 can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
496 496 forbidden = not val(gr_name, 'can write into group validator')
497 497 value_changed = True # old_data['repo_group'].get('group_id') != safe_int(value)
498 498 if value_changed: # do check if we changed the value
499 499 #parent group need to be existing
500 500 if gr and forbidden:
501 501 msg = M(self, 'permission_denied', state)
502 502 raise formencode.Invalid(msg, value, state,
503 503 error_dict=dict(repo_type=msg)
504 504 )
505 505 ## check if we can write to root location !
506 506 elif gr is None and not can_create_repos():
507 507 msg = M(self, 'permission_denied_root', state)
508 508 raise formencode.Invalid(msg, value, state,
509 509 error_dict=dict(repo_type=msg)
510 510 )
511 511
512 512 return _validator
513 513
514 514
515 515 def CanCreateGroup(can_create_in_root=False):
516 516 class _validator(formencode.validators.FancyValidator):
517 517 messages = {
518 518 'permission_denied': _(u"You don't have permissions "
519 519 "to create a group in this location")
520 520 }
521 521
522 522 def to_python(self, value, state):
523 523 #root location
524 524 if value in [-1, "-1"]:
525 525 return None
526 526 return value
527 527
528 528 def validate_python(self, value, state):
529 529 gr = RepoGroup.get(value)
530 530 gr_name = gr.group_name if gr else None # None means ROOT location
531 531
532 532 if can_create_in_root and gr is None:
533 533 #we can create in root, we're fine no validations required
534 534 return
535 535
536 536 forbidden_in_root = gr is None and not can_create_in_root
537 537 val = HasReposGroupPermissionAny('group.admin')
538 538 forbidden = not val(gr_name, 'can create group validator')
539 539 if forbidden_in_root or forbidden:
540 540 msg = M(self, 'permission_denied', state)
541 541 raise formencode.Invalid(msg, value, state,
542 542 error_dict=dict(group_parent_id=msg)
543 543 )
544 544
545 545 return _validator
546 546
547 547
548 548 def ValidPerms(type_='repo'):
549 549 if type_ == 'group':
550 550 EMPTY_PERM = 'group.none'
551 551 elif type_ == 'repo':
552 552 EMPTY_PERM = 'repository.none'
553 553
554 554 class _validator(formencode.validators.FancyValidator):
555 555 messages = {
556 556 'perm_new_member_name':
557 557 _(u'This username or user group name is not valid')
558 558 }
559 559
560 560 def to_python(self, value, state):
561 561 perms_update = OrderedSet()
562 562 perms_new = OrderedSet()
563 563 # build a list of permission to update and new permission to create
564 564
565 565 #CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
566 566 new_perms_group = defaultdict(dict)
567 567 for k, v in value.copy().iteritems():
568 568 if k.startswith('perm_new_member'):
569 569 del value[k]
570 570 _type, part = k.split('perm_new_member_')
571 571 args = part.split('_')
572 572 if len(args) == 1:
573 573 new_perms_group[args[0]]['perm'] = v
574 574 elif len(args) == 2:
575 575 _key, pos = args
576 576 new_perms_group[pos][_key] = v
577 577
578 578 # fill new permissions in order of how they were added
579 579 for k in sorted(map(int, new_perms_group.keys())):
580 580 perm_dict = new_perms_group[str(k)]
581 581 new_member = perm_dict.get('name')
582 582 new_perm = perm_dict.get('perm')
583 583 new_type = perm_dict.get('type')
584 584 if new_member and new_perm and new_type:
585 585 perms_new.add((new_member, new_perm, new_type))
586 586
587 587 for k, v in value.iteritems():
588 588 if k.startswith('u_perm_') or k.startswith('g_perm_'):
589 589 member = k[7:]
590 590 t = {'u': 'user',
591 591 'g': 'users_group'
592 592 }[k[0]]
593 593 if member == 'default':
594 594 if str2bool(value.get('repo_private')):
595 595 # set none for default when updating to
596 596 # private repo protects agains form manipulation
597 597 v = EMPTY_PERM
598 598 perms_update.add((member, v, t))
599 599
600 600 value['perms_updates'] = list(perms_update)
601 601 value['perms_new'] = list(perms_new)
602 602
603 603 # update permissions
604 604 for k, v, t in perms_new:
605 605 try:
606 606 if t is 'user':
607 607 self.user_db = User.query()\
608 608 .filter(User.active == True)\
609 609 .filter(User.username == k).one()
610 610 if t is 'users_group':
611 611 self.user_db = UserGroup.query()\
612 612 .filter(UserGroup.users_group_active == True)\
613 613 .filter(UserGroup.users_group_name == k).one()
614 614
615 615 except Exception:
616 616 log.exception('Updated permission failed')
617 617 msg = M(self, 'perm_new_member_type', state)
618 618 raise formencode.Invalid(msg, value, state,
619 619 error_dict=dict(perm_new_member_name=msg)
620 620 )
621 621 return value
622 622 return _validator
623 623
624 624
625 625 def ValidSettings():
626 626 class _validator(formencode.validators.FancyValidator):
627 627 def _to_python(self, value, state):
628 628 # settings form for users that are not admin
629 629 # can't edit certain parameters, it's extra backup if they mangle
630 630 # with forms
631 631
632 632 forbidden_params = [
633 633 'user', 'repo_type', 'repo_enable_locking',
634 634 'repo_enable_downloads', 'repo_enable_statistics'
635 635 ]
636 636
637 637 for param in forbidden_params:
638 638 if param in value:
639 639 del value[param]
640 640 return value
641 641
642 642 def validate_python(self, value, state):
643 643 pass
644 644 return _validator
645 645
646 646
647 647 def ValidPath():
648 648 class _validator(formencode.validators.FancyValidator):
649 649 messages = {
650 650 'invalid_path': _(u'This is not a valid path')
651 651 }
652 652
653 653 def validate_python(self, value, state):
654 654 if not os.path.isdir(value):
655 655 msg = M(self, 'invalid_path', state)
656 656 raise formencode.Invalid(msg, value, state,
657 657 error_dict=dict(paths_root_path=msg)
658 658 )
659 659 return _validator
660 660
661 661
662 662 def UniqSystemEmail(old_data={}):
663 663 class _validator(formencode.validators.FancyValidator):
664 664 messages = {
665 665 'email_taken': _(u'This e-mail address is already taken')
666 666 }
667 667
668 668 def _to_python(self, value, state):
669 669 return value.lower()
670 670
671 671 def validate_python(self, value, state):
672 672 if (old_data.get('email') or '').lower() != value:
673 673 user = User.get_by_email(value, case_insensitive=True)
674 674 if user:
675 675 msg = M(self, 'email_taken', state)
676 676 raise formencode.Invalid(msg, value, state,
677 677 error_dict=dict(email=msg)
678 678 )
679 679 return _validator
680 680
681 681
682 682 def ValidSystemEmail():
683 683 class _validator(formencode.validators.FancyValidator):
684 684 messages = {
685 685 'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
686 686 }
687 687
688 688 def _to_python(self, value, state):
689 689 return value.lower()
690 690
691 691 def validate_python(self, value, state):
692 692 user = User.get_by_email(value, case_insensitive=True)
693 693 if user is None:
694 694 msg = M(self, 'non_existing_email', state, email=value)
695 695 raise formencode.Invalid(msg, value, state,
696 696 error_dict=dict(email=msg)
697 697 )
698 698
699 699 return _validator
700 700
701 701
702 702 def LdapLibValidator():
703 703 class _validator(formencode.validators.FancyValidator):
704 704 messages = {
705 705
706 706 }
707 707
708 708 def validate_python(self, value, state):
709 709 try:
710 710 import ldap
711 711 ldap # pyflakes silence !
712 712 except ImportError:
713 713 raise LdapImportError()
714 714
715 715 return _validator
716 716
717 717
718 718 def AttrLoginValidator():
719 719 class _validator(formencode.validators.FancyValidator):
720 720 messages = {
721 721 'invalid_cn':
722 722 _(u'The LDAP Login attribute of the CN must be specified - '
723 723 'this is the name of the attribute that is equivalent '
724 724 'to "username"')
725 725 }
726
727 def validate_python(self, value, state):
728 if not value or not isinstance(value, (str, unicode)):
729 msg = M(self, 'invalid_cn', state)
730 raise formencode.Invalid(msg, value, state,
731 error_dict=dict(ldap_attr_login=msg)
732 )
726 messages['empty'] = messages['invalid_cn']
733 727
734 728 return _validator
735 729
736 730
737 731 def NotReviewedRevisions(repo_id):
738 732 class _validator(formencode.validators.FancyValidator):
739 733 messages = {
740 734 'rev_already_reviewed':
741 735 _(u'Revisions %(revs)s are already part of pull request '
742 736 'or have set status')
743 737 }
744 738
745 739 def validate_python(self, value, state):
746 740 # check revisions if they are not reviewed, or a part of another
747 741 # pull request
748 742 statuses = ChangesetStatus.query()\
749 743 .filter(ChangesetStatus.revision.in_(value))\
750 744 .filter(ChangesetStatus.repo_id == repo_id)\
751 745 .all()
752 746
753 747 errors = []
754 748 for cs in statuses:
755 749 if cs.pull_request_id:
756 750 errors.append(['pull_req', cs.revision[:12]])
757 751 elif cs.status:
758 752 errors.append(['status', cs.revision[:12]])
759 753
760 754 if errors:
761 755 revs = ','.join([x[1] for x in errors])
762 756 msg = M(self, 'rev_already_reviewed', state, revs=revs)
763 757 raise formencode.Invalid(msg, value, state,
764 758 error_dict=dict(revisions=revs)
765 759 )
766 760
767 761 return _validator
768 762
769 763
770 764 def ValidIp():
771 765 class _validator(CIDR):
772 766 messages = dict(
773 767 badFormat=_('Please enter a valid IPv4 or IpV6 address'),
774 768 illegalBits=_('The network size (bits) must be within the range'
775 769 ' of 0-32 (not %(bits)r)'))
776 770
777 771 def to_python(self, value, state):
778 772 v = super(_validator, self).to_python(value, state)
779 773 v = v.strip()
780 774 net = ipaddr.IPNetwork(address=v)
781 775 if isinstance(net, ipaddr.IPv4Network):
782 776 #if IPv4 doesn't end with a mask, add /32
783 777 if '/' not in value:
784 778 v += '/32'
785 779 if isinstance(net, ipaddr.IPv6Network):
786 780 #if IPv6 doesn't end with a mask, add /128
787 781 if '/' not in value:
788 782 v += '/128'
789 783 return v
790 784
791 785 def validate_python(self, value, state):
792 786 try:
793 787 addr = value.strip()
794 788 #this raises an ValueError if address is not IpV4 or IpV6
795 789 ipaddr.IPNetwork(address=addr)
796 790 except ValueError:
797 791 raise formencode.Invalid(self.message('badFormat', state),
798 792 value, state)
799 793
800 794 return _validator
801 795
802 796
803 797 def FieldKey():
804 798 class _validator(formencode.validators.FancyValidator):
805 799 messages = dict(
806 800 badFormat=_('Key name can only consist of letters, '
807 801 'underscore, dash or numbers'),)
808 802
809 803 def validate_python(self, value, state):
810 804 if not re.match('[a-zA-Z0-9_-]+$', value):
811 805 raise formencode.Invalid(self.message('badFormat', state),
812 806 value, state)
813 807 return _validator
@@ -1,175 +1,184 b''
1 1 """Pylons application test package
2 2
3 3 This package assumes the Pylons environment is already loaded, such as
4 4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 5 command.
6 6
7 7 This module initializes the application via ``websetup`` (`paster
8 8 setup-app`) and provides the base testing objects.
9 9
10 10 nosetests -x - fail on first error
11 11 nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
12 12 nosetests --pdb --pdb-failures
13 13 nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
14 14
15 15 optional FLAGS:
16 16 RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
17 17 RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
18 18
19 19 """
20 20 import os
21 21 import time
22 22 import logging
23 23 import datetime
24 24 import hashlib
25 25 import tempfile
26 26 from os.path import join as jn
27 27
28 28 from unittest import TestCase
29 29 from tempfile import _RandomNameSequence
30 30
31 31 from paste.deploy import loadapp
32 32 from paste.script.appinstall import SetupCommand
33 33 from pylons import config, url
34 34 from routes.util import URLGenerator
35 35 from webtest import TestApp
36 36 from nose.plugins.skip import SkipTest
37 37
38 38 from rhodecode import is_windows
39 39 from rhodecode.model.meta import Session
40 40 from rhodecode.model.db import User
41 41 from rhodecode.tests.nose_parametrized import parameterized
42 42
43 43 import pylons.test
44 44 from rhodecode.lib.utils2 import safe_unicode, safe_str
45 45
46 46
47 47 os.environ['TZ'] = 'UTC'
48 48 if not is_windows:
49 49 time.tzset()
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53 __all__ = [
54 54 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
55 'SkipTest',
55 'SkipTest', 'ldap_lib_installed',
56 56 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
57 57 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
58 58 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
59 59 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
60 60 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
61 61 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
62 62 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
63 63 'GIT_REMOTE_REPO', 'SCM_TESTS',
64 64 ]
65 65
66 66 # Invoke websetup with the current config file
67 67 # SetupCommand('setup-app').run([config_file])
68 68
69 69 environ = {}
70 70
71 71 #SOME GLOBALS FOR TESTS
72 72
73 73 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
74 74 TEST_USER_ADMIN_LOGIN = 'test_admin'
75 75 TEST_USER_ADMIN_PASS = 'test12'
76 76 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
77 77
78 78 TEST_USER_REGULAR_LOGIN = 'test_regular'
79 79 TEST_USER_REGULAR_PASS = 'test12'
80 80 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
81 81
82 82 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
83 83 TEST_USER_REGULAR2_PASS = 'test12'
84 84 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
85 85
86 86 HG_REPO = 'vcs_test_hg'
87 87 GIT_REPO = 'vcs_test_git'
88 88
89 89 NEW_HG_REPO = 'vcs_test_hg_new'
90 90 NEW_GIT_REPO = 'vcs_test_git_new'
91 91
92 92 HG_FORK = 'vcs_test_hg_fork'
93 93 GIT_FORK = 'vcs_test_git_fork'
94 94
95 95 ## VCS
96 96 SCM_TESTS = ['hg', 'git']
97 97 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
98 98
99 99 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
100 100
101 101 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
102 102 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
103 103 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
104 104
105 105
106 106 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
107 107
108 108 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
109 109 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
110 110 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
111 111
112 112 TEST_DIR = tempfile.gettempdir()
113 113 TEST_REPO_PREFIX = 'vcs-test'
114 114
115 115 # cached repos if any !
116 116 # comment out to get some other repos from bb or github
117 117 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
118 118 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
119 119
120 #skip ldap tests if LDAP lib is not installed
121 ldap_lib_installed = False
122 try:
123 import ldap
124 ldap_lib_installed = True
125 except ImportError:
126 # means that python-ldap is not installed
127 pass
128
120 129
121 130 def get_new_dir(title):
122 131 """
123 132 Returns always new directory path.
124 133 """
125 134 from rhodecode.tests.vcs.utils import get_normalized_path
126 135 name = TEST_REPO_PREFIX
127 136 if title:
128 137 name = '-'.join((name, title))
129 138 hex = hashlib.sha1(str(time.time())).hexdigest()
130 139 name = '-'.join((name, hex))
131 140 path = os.path.join(TEST_DIR, name)
132 141 return get_normalized_path(path)
133 142
134 143
135 144 class TestController(TestCase):
136 145
137 146 def __init__(self, *args, **kwargs):
138 147 wsgiapp = pylons.test.pylonsapp
139 148 config = wsgiapp.config
140 149
141 150 self.app = TestApp(wsgiapp)
142 151 url._push_object(URLGenerator(config['routes.map'], environ))
143 152 self.Session = Session
144 153 self.index_location = config['app_conf']['index_dir']
145 154 TestCase.__init__(self, *args, **kwargs)
146 155
147 156 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
148 157 password=TEST_USER_ADMIN_PASS):
149 158 self._logged_username = username
150 159 response = self.app.post(url(controller='login', action='index'),
151 160 {'username': username,
152 161 'password': password})
153 162
154 163 if 'invalid user name' in response.body:
155 164 self.fail('could not login using %s %s' % (username, password))
156 165
157 166 self.assertEqual(response.status, '302 Found')
158 167 ses = response.session['rhodecode_user']
159 168 self.assertEqual(ses.get('username'), username)
160 169 response = response.follow()
161 170 self.assertEqual(ses.get('is_authenticated'), True)
162 171
163 172 return response.session['rhodecode_user']
164 173
165 174 def _get_logged_user(self):
166 175 return User.get_by_username(self._logged_username)
167 176
168 177 def checkSessionFlash(self, response, msg):
169 178 self.assertTrue('flash' in response.session,
170 179 msg='Response session:%r have no flash'
171 180 % response.session)
172 181 if not msg in response.session['flash'][0][1]:
173 182 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
174 183 msg, response.session['flash'][0][1])
175 184 self.fail(safe_str(msg))
@@ -1,85 +1,98 b''
1 1 from rhodecode.tests import *
2 2 from rhodecode.model.db import RhodeCodeSetting
3 3
4 skip_ldap_test = False
5 try:
6 import ldap
7 except ImportError:
8 # means that python-ldap is not installed
9 skip_ldap_test = True
10 pass
11
12
13 4 class TestLdapSettingsController(TestController):
14 5
15 6 def test_index(self):
16 7 self.log_user()
17 8 response = self.app.get(url(controller='admin/ldap_settings',
18 9 action='index'))
19 10 response.mustcontain('LDAP administration')
20 11
21 12 def test_ldap_save_settings(self):
22 13 self.log_user()
23 if skip_ldap_test:
14 if ldap_lib_installed:
24 15 raise SkipTest('skipping due to missing ldap lib')
25 16
26 17 test_url = url(controller='admin/ldap_settings',
27 18 action='ldap_settings')
28 19
29 20 response = self.app.post(url=test_url,
30 21 params={'ldap_host' : u'dc.example.com',
31 22 'ldap_port' : '999',
32 23 'ldap_tls_kind' : 'PLAIN',
33 24 'ldap_tls_reqcert' : 'NEVER',
34 25 'ldap_dn_user':'test_user',
35 26 'ldap_dn_pass':'test_pass',
36 27 'ldap_base_dn':'test_base_dn',
37 28 'ldap_filter':'test_filter',
38 29 'ldap_search_scope':'BASE',
39 30 'ldap_attr_login':'test_attr_login',
40 31 'ldap_attr_firstname':'ima',
41 32 'ldap_attr_lastname':'tester',
42 33 'ldap_attr_email':'test@example.com' })
43 34
44 35 new_settings = RhodeCodeSetting.get_ldap_settings()
45 36 self.assertEqual(new_settings['ldap_host'], u'dc.example.com',
46 37 'fail db write compare')
47 38
48 39 self.checkSessionFlash(response,
49 40 'LDAP settings updated successfully')
50 41
51 def test_ldap_error_form(self):
42 def test_ldap_error_form_wrong_port_number(self):
52 43 self.log_user()
53 if skip_ldap_test:
44 if ldap_lib_installed:
54 45 raise SkipTest('skipping due to missing ldap lib')
55 46
56 47 test_url = url(controller='admin/ldap_settings',
57 48 action='ldap_settings')
58 49
59 50 response = self.app.post(url=test_url,
60 51 params={'ldap_host' : '',
61 'ldap_port' : 'i-should-be-number',
52 'ldap_port': 'i-should-be-number', # bad port num
53 'ldap_tls_kind': 'PLAIN',
54 'ldap_tls_reqcert': 'NEVER',
55 'ldap_dn_user': '',
56 'ldap_dn_pass': '',
57 'ldap_base_dn': '',
58 'ldap_filter': '',
59 'ldap_search_scope': 'BASE',
60 'ldap_attr_login': '',
61 'ldap_attr_firstname': '',
62 'ldap_attr_lastname': '',
63 'ldap_attr_email': ''})
64
65 response.mustcontain("""<span class="error-message">"""
66 """Please enter a number</span><br />""")
67
68 def test_ldap_error_form(self):
69 self.log_user()
70 if ldap_lib_installed:
71 raise SkipTest('skipping due to missing ldap lib')
72
73 test_url = url(controller='admin/ldap_settings',
74 action='ldap_settings')
75
76 response = self.app.post(url=test_url,
77 params={'ldap_host': 'Host',
78 'ldap_port': '123',
62 79 'ldap_tls_kind' : 'PLAIN',
63 80 'ldap_tls_reqcert' : 'NEVER',
64 81 'ldap_dn_user':'',
65 82 'ldap_dn_pass':'',
66 83 'ldap_base_dn':'',
67 84 'ldap_filter':'',
68 85 'ldap_search_scope':'BASE',
69 86 'ldap_attr_login':'', # <----- missing required input
70 87 'ldap_attr_firstname':'',
71 88 'ldap_attr_lastname':'',
72 89 'ldap_attr_email':'' })
73 90
74 91 response.mustcontain("""<span class="error-message">The LDAP Login"""
75 92 """ attribute of the CN must be specified""")
76 93
77
78 response.mustcontain("""<span class="error-message">Please """
79 """enter a number</span>""")
80
81 94 def test_ldap_login(self):
82 95 pass
83 96
84 97 def test_ldap_login_incorrect(self):
85 98 pass
@@ -1,248 +1,252 b''
1 1 # -*- coding: utf-8 -*-
2 2 import unittest
3 3 import formencode
4 4
5 5 from rhodecode.tests import *
6 6
7 7 from rhodecode.model import validators as v
8 8 from rhodecode.model.users_group import UserGroupModel
9 9
10 10 from rhodecode.model.meta import Session
11 11 from rhodecode.model.repos_group import ReposGroupModel
12 12 from rhodecode.config.routing import ADMIN_PREFIX
13 13 from rhodecode.model.db import ChangesetStatus, Repository
14 14 from rhodecode.model.changeset_status import ChangesetStatusModel
15 15
16 16
17 17 class TestReposGroups(unittest.TestCase):
18 18
19 19 def setUp(self):
20 20 pass
21 21
22 22 def tearDown(self):
23 23 Session.remove()
24 24
25 25 def test_Message_extractor(self):
26 26 validator = v.ValidUsername()
27 27 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
28 28
29 29 class StateObj(object):
30 30 pass
31 31
32 32 self.assertRaises(formencode.Invalid,
33 33 validator.to_python, 'default', StateObj)
34 34
35 35 def test_ValidUsername(self):
36 36 validator = v.ValidUsername()
37 37
38 38 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
39 39 self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
40 40 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
41 41 self.assertRaises(formencode.Invalid, validator.to_python,
42 42 TEST_USER_ADMIN_LOGIN)
43 43 self.assertEqual('test', validator.to_python('test'))
44 44
45 45 validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
46 46
47 47 def test_ValidRepoUser(self):
48 48 validator = v.ValidRepoUser()
49 49 self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
50 50 self.assertEqual(TEST_USER_ADMIN_LOGIN,
51 51 validator.to_python(TEST_USER_ADMIN_LOGIN))
52 52
53 53 def test_ValidUserGroup(self):
54 54 validator = v.ValidUserGroup()
55 55 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
56 56 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
57 57
58 58 gr = UserGroupModel().create('test')
59 59 gr2 = UserGroupModel().create('tes2')
60 60 Session.commit()
61 61 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
62 62 assert gr.users_group_id != None
63 63 validator = v.ValidUserGroup(edit=True,
64 64 old_data={'users_group_id':
65 65 gr2.users_group_id})
66 66
67 67 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
68 68 self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
69 69 self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
70 70 UserGroupModel().delete(gr)
71 71 UserGroupModel().delete(gr2)
72 72 Session.commit()
73 73
74 74 def test_ValidReposGroup(self):
75 75 validator = v.ValidReposGroup()
76 76 model = ReposGroupModel()
77 77 self.assertRaises(formencode.Invalid, validator.to_python,
78 78 {'group_name': HG_REPO, })
79 79 gr = model.create(group_name='test_gr', group_description='desc',
80 80 parent=None,
81 81 just_db=True,
82 82 owner=TEST_USER_ADMIN_LOGIN)
83 83 self.assertRaises(formencode.Invalid,
84 84 validator.to_python, {'group_name': gr.group_name, })
85 85
86 86 validator = v.ValidReposGroup(edit=True,
87 87 old_data={'group_id': gr.group_id})
88 88 self.assertRaises(formencode.Invalid,
89 89 validator.to_python, {
90 90 'group_name': gr.group_name + 'n',
91 91 'group_parent_id': gr.group_id
92 92 })
93 93 model.delete(gr)
94 94
95 95 def test_ValidPassword(self):
96 96 validator = v.ValidPassword()
97 97 self.assertEqual('lol', validator.to_python('lol'))
98 98 self.assertEqual(None, validator.to_python(None))
99 99 self.assertRaises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
100 100
101 101 def test_ValidPasswordsMatch(self):
102 102 validator = v.ValidPasswordsMatch()
103 103 self.assertRaises(formencode.Invalid,
104 104 validator.to_python, {'password': 'pass',
105 105 'password_confirmation': 'pass2'})
106 106
107 107 self.assertRaises(formencode.Invalid,
108 108 validator.to_python, {'new_password': 'pass',
109 109 'password_confirmation': 'pass2'})
110 110
111 111 self.assertEqual({'new_password': 'pass',
112 112 'password_confirmation': 'pass'},
113 113 validator.to_python({'new_password': 'pass',
114 114 'password_confirmation': 'pass'}))
115 115
116 116 self.assertEqual({'password': 'pass',
117 117 'password_confirmation': 'pass'},
118 118 validator.to_python({'password': 'pass',
119 119 'password_confirmation': 'pass'}))
120 120
121 121 def test_ValidAuth(self):
122 122 validator = v.ValidAuth()
123 123 valid_creds = {
124 124 'username': TEST_USER_REGULAR2_LOGIN,
125 125 'password': TEST_USER_REGULAR2_PASS,
126 126 }
127 127 invalid_creds = {
128 128 'username': 'err',
129 129 'password': 'err',
130 130 }
131 131 self.assertEqual(valid_creds, validator.to_python(valid_creds))
132 132 self.assertRaises(formencode.Invalid,
133 133 validator.to_python, invalid_creds)
134 134
135 135 def test_ValidAuthToken(self):
136 136 validator = v.ValidAuthToken()
137 137 # this is untestable without a threadlocal
138 138 # self.assertRaises(formencode.Invalid,
139 139 # validator.to_python, 'BadToken')
140 140 validator
141 141
142 142 def test_ValidRepoName(self):
143 143 validator = v.ValidRepoName()
144 144
145 145 self.assertRaises(formencode.Invalid,
146 146 validator.to_python, {'repo_name': ''})
147 147
148 148 self.assertRaises(formencode.Invalid,
149 149 validator.to_python, {'repo_name': HG_REPO})
150 150
151 151 gr = ReposGroupModel().create(group_name='group_test',
152 152 group_description='desc',
153 153 parent=None,
154 154 owner=TEST_USER_ADMIN_LOGIN)
155 155 self.assertRaises(formencode.Invalid,
156 156 validator.to_python, {'repo_name': gr.group_name})
157 157
158 158 #TODO: write an error case for that ie. create a repo withinh a group
159 159 # self.assertRaises(formencode.Invalid,
160 160 # validator.to_python, {'repo_name': 'some',
161 161 # 'repo_group': gr.group_id})
162 162
163 163 def test_ValidForkName(self):
164 164 # this uses ValidRepoName validator
165 165 assert True
166 166
167 167 @parameterized.expand([
168 168 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
169 169 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
170 170 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
171 171 ('/]re po', 're-po')])
172 172 def test_SlugifyName(self, name, expected):
173 173 validator = v.SlugifyName()
174 174 self.assertEqual(expected, validator.to_python(name))
175 175
176 176 def test_ValidCloneUri(self):
177 177 #TODO: write this one
178 178 pass
179 179
180 180 def test_ValidForkType(self):
181 181 validator = v.ValidForkType(old_data={'repo_type': 'hg'})
182 182 self.assertEqual('hg', validator.to_python('hg'))
183 183 self.assertRaises(formencode.Invalid, validator.to_python, 'git')
184 184
185 185 def test_ValidPerms(self):
186 186 #TODO: write this one
187 187 pass
188 188
189 189 def test_ValidSettings(self):
190 190 validator = v.ValidSettings()
191 191 self.assertEqual({'pass': 'pass'},
192 192 validator.to_python(value={'user': 'test',
193 193 'pass': 'pass'}))
194 194
195 195 self.assertEqual({'user2': 'test', 'pass': 'pass'},
196 196 validator.to_python(value={'user2': 'test',
197 197 'pass': 'pass'}))
198 198
199 199 def test_ValidPath(self):
200 200 validator = v.ValidPath()
201 201 self.assertEqual(TESTS_TMP_PATH,
202 202 validator.to_python(TESTS_TMP_PATH))
203 203 self.assertRaises(formencode.Invalid, validator.to_python,
204 204 '/no_such_dir')
205 205
206 206 def test_UniqSystemEmail(self):
207 207 validator = v.UniqSystemEmail(old_data={})
208 208
209 209 self.assertEqual('mail@python.org',
210 210 validator.to_python('MaiL@Python.org'))
211 211
212 212 email = TEST_USER_REGULAR2_EMAIL
213 213 self.assertRaises(formencode.Invalid, validator.to_python, email)
214 214
215 215 def test_ValidSystemEmail(self):
216 216 validator = v.ValidSystemEmail()
217 217 email = TEST_USER_REGULAR2_EMAIL
218 218
219 219 self.assertEqual(email, validator.to_python(email))
220 220 self.assertRaises(formencode.Invalid, validator.to_python, 'err')
221 221
222 222 def test_LdapLibValidator(self):
223 if ldap_lib_installed:
224 validator = v.LdapLibValidator()
225 self.assertEqual("DN", validator.to_python('DN'))
226 else:
223 227 validator = v.LdapLibValidator()
224 228 self.assertRaises(v.LdapImportError, validator.to_python, 'err')
225 229
226 230 def test_AttrLoginValidator(self):
227 231 validator = v.AttrLoginValidator()
228 self.assertRaises(formencode.Invalid, validator.to_python, 123)
232 self.assertEqual('DN_attr', validator.to_python('DN_attr'))
229 233
230 234 def test_NotReviewedRevisions(self):
231 235 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
232 236 validator = v.NotReviewedRevisions(repo_id)
233 237 rev = '0' * 40
234 238 # add status for a rev, that should throw an error because it is already
235 239 # reviewed
236 240 new_status = ChangesetStatus()
237 241 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
238 242 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
239 243 new_status.status = ChangesetStatus.STATUS_APPROVED
240 244 new_status.comment = None
241 245 new_status.revision = rev
242 246 Session().add(new_status)
243 247 Session().commit()
244 248 try:
245 249 self.assertRaises(formencode.Invalid, validator.to_python, [rev])
246 250 finally:
247 251 Session().delete(new_status)
248 252 Session().commit()
General Comments 0
You need to be logged in to leave comments. Login now