##// END OF EJS Templates
fix(validators): fixed improved error message on user group validators
super-admin -
r5267:9a7f2cde default
parent child Browse files
Show More
@@ -1,1115 +1,1111 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 Set of generic validators
21 21 """
22 22
23 23
24 24 import os
25 25 import re
26 26 import logging
27 27 import collections
28 28
29 29 import formencode
30 30 import ipaddress
31 31 from formencode.validators import (
32 32 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
33 33 NotEmpty, IPAddress, CIDR, String, FancyValidator
34 34 )
35 35
36 36 from sqlalchemy.sql.expression import true
37 37 from sqlalchemy.util import OrderedSet
38 38
39 39 from rhodecode.authentication import (
40 40 legacy_plugin_prefix, _import_legacy_plugin)
41 41 from rhodecode.authentication.base import loadplugin
42 42 from rhodecode.apps._base import ADMIN_PREFIX
43 43 from rhodecode.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
44 44 from rhodecode.lib.utils import repo_name_slug, make_db_config
45 45 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
46 46 from rhodecode.lib.str_utils import safe_str
47 47 from rhodecode.lib.hash_utils import md5_safe
48 48 from rhodecode.lib.vcs.backends.git.repository import GitRepository
49 49 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
50 50 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
51 51 from rhodecode.model.db import (
52 52 RepoGroup, Repository, UserGroup, User, ChangesetStatus, Gist)
53 53 from rhodecode.model.settings import VcsSettingsModel
54 54
55 55 # silence warnings and pylint
56 56 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
57 57 NotEmpty, IPAddress, CIDR, String, FancyValidator
58 58
59 59 log = logging.getLogger(__name__)
60 60
61 61
62 62 class _Missing(object):
63 63 pass
64 64
65 65
66 66 Missing = _Missing()
67 67
68 68
69 69 def M(self, key, state, **kwargs):
70 70 """
71 71 returns string from self.message based on given key,
72 72 passed kw params are used to substitute %(named)s params inside
73 73 translated strings
74 74
75 75 :param msg:
76 76 :param state:
77 77 """
78 78
79 79 #state._ = staticmethod(_)
80 80 # inject validator into state object
81 81 return self.message(key, state, **kwargs)
82 82
83 83
84 84 def UniqueList(localizer, convert=None):
85 85 _ = localizer
86 86
87 87 class _validator(formencode.FancyValidator):
88 88 """
89 89 Unique List !
90 90 """
91 91 accept_iterator = True
92 92
93 93 messages = {
94 94 'empty': _('Value cannot be an empty list'),
95 95 'missing_value': _('Value cannot be an empty list'),
96 96 }
97 97
98 98 def _convert_to_python(self, value, state):
99 99
100 100 def make_unique(_value):
101 101 seen = []
102 102 return [c for c in _value if not (c in seen or seen.append(c))]
103 103
104 104 if isinstance(value, list):
105 105 ret_val = make_unique(value)
106 106 elif isinstance(value, set):
107 107 ret_val = make_unique(list(value))
108 108 elif isinstance(value, tuple):
109 109 ret_val = make_unique(list(value))
110 110 elif value is None:
111 111 ret_val = []
112 112 else:
113 113 ret_val = [value]
114 114
115 115 if convert:
116 116 ret_val = list(map(convert, ret_val))
117 117 return ret_val
118 118
119 119 def empty_value(self, value):
120 120 return []
121 121
122 122 return _validator
123 123
124 124
125 125 def UniqueListFromString(localizer):
126 126 _ = localizer
127 127
128 128 class _validator(UniqueList(localizer)):
129 129 def _convert_to_python(self, value, state):
130 130 if isinstance(value, str):
131 131 value = aslist(value, ',')
132 132 return super()._convert_to_python(value, state)
133 133 return _validator
134 134
135 135
136 136 def ValidSvnPattern(localizer, section, repo_name=None):
137 137 _ = localizer
138 138
139 139 class _validator(formencode.validators.FancyValidator):
140 140 messages = {
141 141 'pattern_exists': _('Pattern already exists'),
142 142 }
143 143
144 144 def _validate_python(self, value, state):
145 145 if not value:
146 146 return
147 147 model = VcsSettingsModel(repo=repo_name)
148 148 ui_settings = model.get_svn_patterns(section=section)
149 149 for entry in ui_settings:
150 150 if value == entry.value:
151 151 msg = M(self, 'pattern_exists', state)
152 152 raise formencode.Invalid(msg, value, state)
153 153 return _validator
154 154
155 155
156 156 def ValidUsername(localizer, edit=False, old_data=None):
157 157 _ = localizer
158 158 old_data = old_data or {}
159 159
160 160 class _validator(formencode.validators.FancyValidator):
161 161 messages = {
162 162 'username_exists': _('Username "%(username)s" already exists'),
163 163 'system_invalid_username':
164 164 _('Username "%(username)s" is forbidden'),
165 165 'invalid_username':
166 166 _('Username may only contain alphanumeric characters '
167 'underscores, periods or dashes and must begin with '
168 'alphanumeric character or underscore')
167 'underscores, periods or dashes and must begin with '
168 'alphanumeric character or underscore')
169 169 }
170 170
171 171 def _validate_python(self, value, state):
172 172 if value in ['default', 'new_user']:
173 173 msg = M(self, 'system_invalid_username', state, username=value)
174 174 raise formencode.Invalid(msg, value, state)
175 175 # check if user is unique
176 176 old_un = None
177 177 if edit:
178 178 old_un = User.get(old_data.get('user_id')).username
179 179
180 180 if old_un != value or not edit:
181 181 if User.get_by_username(value, case_insensitive=True):
182 182 msg = M(self, 'username_exists', state, username=value)
183 183 raise formencode.Invalid(msg, value, state)
184 184
185 185 if (re.match(r'^[\w]{1}[\w\-\.]{0,254}$', value)
186 186 is None):
187 187 msg = M(self, 'invalid_username', state)
188 188 raise formencode.Invalid(msg, value, state)
189 189 return _validator
190 190
191 191
192 192 def ValidRepoUser(localizer, allow_disabled=False):
193 193 _ = localizer
194 194
195 195 class _validator(formencode.validators.FancyValidator):
196 196 messages = {
197 197 'invalid_username': _('Username %(username)s is not valid'),
198 198 'disabled_username': _('Username %(username)s is disabled')
199 199 }
200 200
201 201 def _validate_python(self, value, state):
202 202 try:
203 203 user = User.query().filter(User.username == value).one()
204 204 except Exception:
205 205 msg = M(self, 'invalid_username', state, username=value)
206 206 raise formencode.Invalid(
207 207 msg, value, state, error_dict={'username': msg}
208 208 )
209 209 if user and (not allow_disabled and not user.active):
210 210 msg = M(self, 'disabled_username', state, username=value)
211 211 raise formencode.Invalid(
212 212 msg, value, state, error_dict={'username': msg}
213 213 )
214 214 return _validator
215 215
216 216
217 217 def ValidUserGroup(localizer, edit=False, old_data=None):
218 218 _ = localizer
219 219 old_data = old_data or {}
220 220
221 221 class _validator(formencode.validators.FancyValidator):
222 222 messages = {
223 223 'invalid_group': _('Invalid user group name'),
224 224 'group_exist': _('User group `%(usergroup)s` already exists'),
225 225 'invalid_usergroup_name':
226 _('user group name may only contain alphanumeric '
226 _('User group name may only contain alphanumeric '
227 227 'characters underscores, periods or dashes and must begin '
228 228 'with alphanumeric character')
229 229 }
230 230
231 231 def _validate_python(self, value, state):
232 232 if value in ['default']:
233 233 msg = M(self, 'invalid_group', state)
234 raise formencode.Invalid(
235 msg, value, state, error_dict={'users_group_name': msg}
236 )
234 raise formencode.Invalid(msg, value, state)
237 235 # check if group is unique
238 236 old_ugname = None
239 237 if edit:
240 238 old_id = old_data.get('users_group_id')
241 239 old_ugname = UserGroup.get(old_id).users_group_name
242 240
243 241 if old_ugname != value or not edit:
244 242 is_existing_group = UserGroup.get_by_group_name(
245 243 value, case_insensitive=True)
246 244 if is_existing_group:
247 245 msg = M(self, 'group_exist', state, usergroup=value)
248 246 raise formencode.Invalid(
249 247 msg, value, state, error_dict={'users_group_name': msg}
250 248 )
251 249
252 250 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
253 251 msg = M(self, 'invalid_usergroup_name', state)
254 raise formencode.Invalid(
255 msg, value, state, error_dict={'users_group_name': msg}
256 )
252 raise formencode.Invalid(msg, value, state)
257 253 return _validator
258 254
259 255
260 256 def ValidRepoGroup(localizer, edit=False, old_data=None, can_create_in_root=False):
261 257 _ = localizer
262 258 old_data = old_data or {}
263 259
264 260 class _validator(formencode.validators.FancyValidator):
265 261 messages = {
266 262 'group_parent_id': _('Cannot assign this group as parent'),
267 263 'group_exists': _('Group "%(group_name)s" already exists'),
268 264 'repo_exists': _('Repository with name "%(group_name)s" '
269 265 'already exists'),
270 266 'permission_denied': _("no permission to store repository group"
271 267 "in this location"),
272 268 'permission_denied_root': _(
273 269 "no permission to store repository group "
274 270 "in root location")
275 271 }
276 272
277 273 def _convert_to_python(self, value, state):
278 274 group_name = repo_name_slug(value.get('group_name', ''))
279 275 group_parent_id = safe_int(value.get('group_parent_id'))
280 276 gr = RepoGroup.get(group_parent_id)
281 277 if gr:
282 278 parent_group_path = gr.full_path
283 279 # value needs to be aware of group name in order to check
284 280 # db key This is an actual just the name to store in the
285 281 # database
286 282 group_name_full = (
287 283 parent_group_path + RepoGroup.url_sep() + group_name)
288 284 else:
289 285 group_name_full = group_name
290 286
291 287 value['group_name'] = group_name
292 288 value['group_name_full'] = group_name_full
293 289 value['group_parent_id'] = group_parent_id
294 290 return value
295 291
296 292 def _validate_python(self, value, state):
297 293
298 294 old_group_name = None
299 295 group_name = value.get('group_name')
300 296 group_name_full = value.get('group_name_full')
301 297 group_parent_id = safe_int(value.get('group_parent_id'))
302 298 if group_parent_id == -1:
303 299 group_parent_id = None
304 300
305 301 group_obj = RepoGroup.get(old_data.get('group_id'))
306 302 parent_group_changed = False
307 303 if edit:
308 304 old_group_name = group_obj.group_name
309 305 old_group_parent_id = group_obj.group_parent_id
310 306
311 307 if group_parent_id != old_group_parent_id:
312 308 parent_group_changed = True
313 309
314 310 # TODO: mikhail: the following if statement is not reached
315 311 # since group_parent_id's OneOf validation fails before.
316 312 # Can be removed.
317 313
318 314 # check against setting a parent of self
319 315 parent_of_self = (
320 316 old_data['group_id'] == group_parent_id
321 317 if group_parent_id else False
322 318 )
323 319 if parent_of_self:
324 320 msg = M(self, 'group_parent_id', state)
325 321 raise formencode.Invalid(
326 322 msg, value, state, error_dict={'group_parent_id': msg}
327 323 )
328 324
329 325 # group we're moving current group inside
330 326 child_group = None
331 327 if group_parent_id:
332 328 child_group = RepoGroup.query().filter(
333 329 RepoGroup.group_id == group_parent_id).scalar()
334 330
335 331 # do a special check that we cannot move a group to one of
336 332 # it's children
337 333 if edit and child_group:
338 334 parents = [x.group_id for x in child_group.parents]
339 335 move_to_children = old_data['group_id'] in parents
340 336 if move_to_children:
341 337 msg = M(self, 'group_parent_id', state)
342 338 raise formencode.Invalid(
343 339 msg, value, state, error_dict={'group_parent_id': msg})
344 340
345 341 # Check if we have permission to store in the parent.
346 342 # Only check if the parent group changed.
347 343 if parent_group_changed:
348 344 if child_group is None:
349 345 if not can_create_in_root:
350 346 msg = M(self, 'permission_denied_root', state)
351 347 raise formencode.Invalid(
352 348 msg, value, state,
353 349 error_dict={'group_parent_id': msg})
354 350 else:
355 351 valid = HasRepoGroupPermissionAny('group.admin')
356 352 forbidden = not valid(
357 353 child_group.group_name, 'can create group validator')
358 354 if forbidden:
359 355 msg = M(self, 'permission_denied', state)
360 356 raise formencode.Invalid(
361 357 msg, value, state,
362 358 error_dict={'group_parent_id': msg})
363 359
364 360 # if we change the name or it's new group, check for existing names
365 361 # or repositories with the same name
366 362 if old_group_name != group_name_full or not edit:
367 363 # check group
368 364 gr = RepoGroup.get_by_group_name(group_name_full)
369 365 if gr:
370 366 msg = M(self, 'group_exists', state, group_name=group_name)
371 367 raise formencode.Invalid(
372 368 msg, value, state, error_dict={'group_name': msg})
373 369
374 370 # check for same repo
375 371 repo = Repository.get_by_repo_name(group_name_full)
376 372 if repo:
377 373 msg = M(self, 'repo_exists', state, group_name=group_name)
378 374 raise formencode.Invalid(
379 375 msg, value, state, error_dict={'group_name': msg})
380 376 return _validator
381 377
382 378
383 379 def ValidPassword(localizer):
384 380 _ = localizer
385 381
386 382 class _validator(formencode.validators.FancyValidator):
387 383 messages = {
388 384 'invalid_password':
389 385 _('Invalid characters (non-ascii) in password')
390 386 }
391 387
392 388 def _validate_python(self, value, state):
393 389 if value and not value.isascii():
394 390 msg = M(self, 'invalid_password', state)
395 391 raise formencode.Invalid(msg, value, state,)
396 392 return _validator
397 393
398 394
399 395 def ValidPasswordsMatch(
400 396 localizer, passwd='new_password',
401 397 passwd_confirmation='password_confirmation'):
402 398 _ = localizer
403 399
404 400 class _validator(formencode.validators.FancyValidator):
405 401 messages = {
406 402 'password_mismatch': _('Passwords do not match'),
407 403 }
408 404
409 405 def _validate_python(self, value, state):
410 406
411 407 pass_val = value.get('password') or value.get(passwd)
412 408 if pass_val != value[passwd_confirmation]:
413 409 msg = M(self, 'password_mismatch', state)
414 410 raise formencode.Invalid(
415 411 msg, value, state,
416 412 error_dict={passwd: msg, passwd_confirmation: msg}
417 413 )
418 414 return _validator
419 415
420 416
421 417 def ValidAuth(localizer):
422 418 _ = localizer
423 419
424 420 class _validator(formencode.validators.FancyValidator):
425 421 messages = {
426 422 'invalid_password': _('invalid password'),
427 423 'invalid_username': _('invalid user name'),
428 424 'disabled_account': _('Your account is disabled')
429 425 }
430 426
431 427 def _validate_python(self, value, state):
432 428 from rhodecode.authentication.base import authenticate, HTTP_TYPE
433 429
434 430 password = value['password']
435 431 username = value['username']
436 432
437 433 if not authenticate(username, password, '', HTTP_TYPE,
438 434 skip_missing=True):
439 435 user = User.get_by_username(username)
440 436 if user and not user.active:
441 437 log.warning('user %s is disabled', username)
442 438 msg = M(self, 'disabled_account', state)
443 439 raise formencode.Invalid(
444 440 msg, value, state, error_dict={'username': msg}
445 441 )
446 442 else:
447 443 log.warning('user `%s` failed to authenticate', username)
448 444 msg = M(self, 'invalid_username', state)
449 445 msg2 = M(self, 'invalid_password', state)
450 446 raise formencode.Invalid(
451 447 msg, value, state,
452 448 error_dict={'username': msg, 'password': msg2}
453 449 )
454 450 return _validator
455 451
456 452
457 453 def ValidRepoName(localizer, edit=False, old_data=None):
458 454 old_data = old_data or {}
459 455 _ = localizer
460 456
461 457 class _validator(formencode.validators.FancyValidator):
462 458 messages = {
463 459 'invalid_repo_name':
464 460 _('Repository name %(repo)s is disallowed'),
465 461 # top level
466 462 'repository_exists': _('Repository with name %(repo)s '
467 463 'already exists'),
468 464 'group_exists': _('Repository group with name "%(repo)s" '
469 465 'already exists'),
470 466 # inside a group
471 467 'repository_in_group_exists': _('Repository with name %(repo)s '
472 468 'exists in group "%(group)s"'),
473 469 'group_in_group_exists': _(
474 470 'Repository group with name "%(repo)s" '
475 471 'exists in group "%(group)s"'),
476 472 }
477 473
478 474 def _convert_to_python(self, value, state):
479 475 repo_name = repo_name_slug(value.get('repo_name', ''))
480 476 repo_group = value.get('repo_group')
481 477 if repo_group:
482 478 gr = RepoGroup.get(repo_group)
483 479 group_path = gr.full_path
484 480 group_name = gr.group_name
485 481 # value needs to be aware of group name in order to check
486 482 # db key This is an actual just the name to store in the
487 483 # database
488 484 repo_name_full = group_path + RepoGroup.url_sep() + repo_name
489 485 else:
490 486 group_name = group_path = ''
491 487 repo_name_full = repo_name
492 488
493 489 value['repo_name'] = repo_name
494 490 value['repo_name_full'] = repo_name_full
495 491 value['group_path'] = group_path
496 492 value['group_name'] = group_name
497 493 return value
498 494
499 495 def _validate_python(self, value, state):
500 496
501 497 repo_name = value.get('repo_name')
502 498 repo_name_full = value.get('repo_name_full')
503 499 group_path = value.get('group_path')
504 500 group_name = value.get('group_name')
505 501
506 502 if repo_name in [ADMIN_PREFIX, '']:
507 503 msg = M(self, 'invalid_repo_name', state, repo=repo_name)
508 504 raise formencode.Invalid(
509 505 msg, value, state, error_dict={'repo_name': msg})
510 506
511 507 rename = old_data.get('repo_name') != repo_name_full
512 508 create = not edit
513 509 if rename or create:
514 510
515 511 if group_path:
516 512 if Repository.get_by_repo_name(repo_name_full):
517 513 msg = M(self, 'repository_in_group_exists', state,
518 514 repo=repo_name, group=group_name)
519 515 raise formencode.Invalid(
520 516 msg, value, state, error_dict={'repo_name': msg})
521 517 if RepoGroup.get_by_group_name(repo_name_full):
522 518 msg = M(self, 'group_in_group_exists', state,
523 519 repo=repo_name, group=group_name)
524 520 raise formencode.Invalid(
525 521 msg, value, state, error_dict={'repo_name': msg})
526 522 else:
527 523 if RepoGroup.get_by_group_name(repo_name_full):
528 524 msg = M(self, 'group_exists', state, repo=repo_name)
529 525 raise formencode.Invalid(
530 526 msg, value, state, error_dict={'repo_name': msg})
531 527
532 528 if Repository.get_by_repo_name(repo_name_full):
533 529 msg = M(
534 530 self, 'repository_exists', state, repo=repo_name)
535 531 raise formencode.Invalid(
536 532 msg, value, state, error_dict={'repo_name': msg})
537 533 return value
538 534 return _validator
539 535
540 536
541 537 def ValidForkName(localizer, *args, **kwargs):
542 538 _ = localizer
543 539
544 540 return ValidRepoName(localizer, *args, **kwargs)
545 541
546 542
547 543 def SlugifyName(localizer):
548 544 _ = localizer
549 545
550 546 class _validator(formencode.validators.FancyValidator):
551 547
552 548 def _convert_to_python(self, value, state):
553 549 return repo_name_slug(value)
554 550
555 551 def _validate_python(self, value, state):
556 552 pass
557 553 return _validator
558 554
559 555
560 556 def CannotHaveGitSuffix(localizer):
561 557 _ = localizer
562 558
563 559 class _validator(formencode.validators.FancyValidator):
564 560 messages = {
565 561 'has_git_suffix':
566 562 _('Repository name cannot end with .git'),
567 563 }
568 564
569 565 def _convert_to_python(self, value, state):
570 566 return value
571 567
572 568 def _validate_python(self, value, state):
573 569 if value and value.endswith('.git'):
574 570 msg = M(
575 571 self, 'has_git_suffix', state)
576 572 raise formencode.Invalid(
577 573 msg, value, state, error_dict={'repo_name': msg})
578 574 return _validator
579 575
580 576
581 577 def ValidCloneUri(localizer):
582 578 _ = localizer
583 579
584 580 class InvalidCloneUrl(Exception):
585 581 allowed_prefixes = ()
586 582
587 583 def url_handler(repo_type, url):
588 584 config = make_db_config(clear_session=False)
589 585 if repo_type == 'hg':
590 586 allowed_prefixes = ('http', 'svn+http', 'git+http')
591 587
592 588 if 'http' in url[:4]:
593 589 # initially check if it's at least the proper URL
594 590 # or does it pass basic auth
595 591 MercurialRepository.check_url(url, config)
596 592 elif 'svn+http' in url[:8]: # svn->hg import
597 593 SubversionRepository.check_url(url, config)
598 594 elif 'git+http' in url[:8]: # git->hg import
599 595 raise NotImplementedError()
600 596 else:
601 597 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
602 598 'Allowed url must start with one of %s'
603 599 % (url, ','.join(allowed_prefixes)))
604 600 exc.allowed_prefixes = allowed_prefixes
605 601 raise exc
606 602
607 603 elif repo_type == 'git':
608 604 allowed_prefixes = ('http', 'svn+http', 'hg+http')
609 605 if 'http' in url[:4]:
610 606 # initially check if it's at least the proper URL
611 607 # or does it pass basic auth
612 608 GitRepository.check_url(url, config)
613 609 elif 'svn+http' in url[:8]: # svn->git import
614 610 raise NotImplementedError()
615 611 elif 'hg+http' in url[:8]: # hg->git import
616 612 raise NotImplementedError()
617 613 else:
618 614 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
619 615 'Allowed url must start with one of %s'
620 616 % (url, ','.join(allowed_prefixes)))
621 617 exc.allowed_prefixes = allowed_prefixes
622 618 raise exc
623 619
624 620 class _validator(formencode.validators.FancyValidator):
625 621 messages = {
626 622 'clone_uri': _('invalid clone url or credentials for %(rtype)s repository'),
627 623 'invalid_clone_uri': _(
628 624 'Invalid clone url, provide a valid clone '
629 625 'url starting with one of %(allowed_prefixes)s')
630 626 }
631 627
632 628 def _validate_python(self, value, state):
633 629 repo_type = value.get('repo_type')
634 630 url = value.get('clone_uri')
635 631
636 632 if url:
637 633 try:
638 634 url_handler(repo_type, url)
639 635 except InvalidCloneUrl as e:
640 636 log.warning(e)
641 637 msg = M(self, 'invalid_clone_uri', state, rtype=repo_type,
642 638 allowed_prefixes=','.join(e.allowed_prefixes))
643 639 raise formencode.Invalid(msg, value, state,
644 640 error_dict={'clone_uri': msg})
645 641 except Exception:
646 642 log.exception('Url validation failed')
647 643 msg = M(self, 'clone_uri', state, rtype=repo_type)
648 644 raise formencode.Invalid(msg, value, state,
649 645 error_dict={'clone_uri': msg})
650 646 return _validator
651 647
652 648
653 649 def ValidForkType(localizer, old_data=None):
654 650 _ = localizer
655 651 old_data = old_data or {}
656 652
657 653 class _validator(formencode.validators.FancyValidator):
658 654 messages = {
659 655 'invalid_fork_type': _('Fork have to be the same type as parent')
660 656 }
661 657
662 658 def _validate_python(self, value, state):
663 659 if old_data['repo_type'] != value:
664 660 msg = M(self, 'invalid_fork_type', state)
665 661 raise formencode.Invalid(
666 662 msg, value, state, error_dict={'repo_type': msg}
667 663 )
668 664 return _validator
669 665
670 666
671 667 def CanWriteGroup(localizer, old_data=None):
672 668 _ = localizer
673 669
674 670 class _validator(formencode.validators.FancyValidator):
675 671 messages = {
676 672 'permission_denied': _(
677 673 "You do not have the permission "
678 674 "to create repositories in this group."),
679 675 'permission_denied_root': _(
680 676 "You do not have the permission to store repositories in "
681 677 "the root location.")
682 678 }
683 679
684 680 def _convert_to_python(self, value, state):
685 681 # root location
686 682 if value in [-1, "-1"]:
687 683 return None
688 684 return value
689 685
690 686 def _validate_python(self, value, state):
691 687 gr = RepoGroup.get(value)
692 688 gr_name = gr.group_name if gr else None # None means ROOT location
693 689 # create repositories with write permission on group is set to true
694 690 create_on_write = HasPermissionAny(
695 691 'hg.create.write_on_repogroup.true')()
696 692 group_admin = HasRepoGroupPermissionAny('group.admin')(
697 693 gr_name, 'can write into group validator')
698 694 group_write = HasRepoGroupPermissionAny('group.write')(
699 695 gr_name, 'can write into group validator')
700 696 forbidden = not (group_admin or (group_write and create_on_write))
701 697 can_create_repos = HasPermissionAny(
702 698 'hg.admin', 'hg.create.repository')
703 699 gid = (old_data['repo_group'].get('group_id')
704 700 if (old_data and 'repo_group' in old_data) else None)
705 701 value_changed = gid != safe_int(value)
706 702 new = not old_data
707 703 # do check if we changed the value, there's a case that someone got
708 704 # revoked write permissions to a repository, he still created, we
709 705 # don't need to check permission if he didn't change the value of
710 706 # groups in form box
711 707 if value_changed or new:
712 708 # parent group need to be existing
713 709 if gr and forbidden:
714 710 msg = M(self, 'permission_denied', state)
715 711 raise formencode.Invalid(
716 712 msg, value, state, error_dict={'repo_type': msg}
717 713 )
718 714 # check if we can write to root location !
719 715 elif gr is None and not can_create_repos():
720 716 msg = M(self, 'permission_denied_root', state)
721 717 raise formencode.Invalid(
722 718 msg, value, state, error_dict={'repo_type': msg}
723 719 )
724 720 return _validator
725 721
726 722
727 723 def ValidPerms(localizer, type_='repo'):
728 724 _ = localizer
729 725 if type_ == 'repo_group':
730 726 EMPTY_PERM = 'group.none'
731 727 elif type_ == 'repo':
732 728 EMPTY_PERM = 'repository.none'
733 729 elif type_ == 'user_group':
734 730 EMPTY_PERM = 'usergroup.none'
735 731
736 732 class _validator(formencode.validators.FancyValidator):
737 733 messages = {
738 734 'perm_new_member_name':
739 735 _('This username or user group name is not valid')
740 736 }
741 737
742 738 def _convert_to_python(self, value, state):
743 739 perm_updates = OrderedSet()
744 740 perm_additions = OrderedSet()
745 741 perm_deletions = OrderedSet()
746 742 # build a list of permission to update/delete and new permission
747 743
748 744 # Read the perm_new_member/perm_del_member attributes and group
749 745 # them by they IDs
750 746 new_perms_group = collections.defaultdict(dict)
751 747 del_perms_group = collections.defaultdict(dict)
752 748 for k, v in list(value.copy().items()):
753 749 if k.startswith('perm_del_member'):
754 750 # delete from org storage so we don't process that later
755 751 del value[k]
756 752 # part is `id`, `type`
757 753 _type, part = k.split('perm_del_member_')
758 754 args = part.split('_')
759 755 if len(args) == 2:
760 756 _key, pos = args
761 757 del_perms_group[pos][_key] = v
762 758 if k.startswith('perm_new_member'):
763 759 # delete from org storage so we don't process that later
764 760 del value[k]
765 761 # part is `id`, `type`, `perm`
766 762 _type, part = k.split('perm_new_member_')
767 763 args = part.split('_')
768 764 if len(args) == 2:
769 765 _key, pos = args
770 766 new_perms_group[pos][_key] = v
771 767
772 768 # store the deletes
773 769 for k in sorted(del_perms_group.keys()):
774 770 perm_dict = del_perms_group[k]
775 771 del_member = perm_dict.get('id')
776 772 del_type = perm_dict.get('type')
777 773 if del_member and del_type:
778 774 perm_deletions.add(
779 775 (del_member, None, del_type))
780 776
781 777 # store additions in order of how they were added in web form
782 778 for k in sorted(new_perms_group.keys()):
783 779 perm_dict = new_perms_group[k]
784 780 new_member = perm_dict.get('id')
785 781 new_type = perm_dict.get('type')
786 782 new_perm = perm_dict.get('perm')
787 783 if new_member and new_perm and new_type:
788 784 perm_additions.add(
789 785 (new_member, new_perm, new_type))
790 786
791 787 # get updates of permissions
792 788 # (read the existing radio button states)
793 789 default_user_id = User.get_default_user_id()
794 790
795 791 for k, update_value in list(value.items()):
796 792 if k.startswith('u_perm_') or k.startswith('g_perm_'):
797 793 obj_type = k[0]
798 794 obj_id = k[7:]
799 795 update_type = {'u': 'user',
800 796 'g': 'user_group'}[obj_type]
801 797
802 798 if obj_type == 'u' and safe_int(obj_id) == default_user_id:
803 799 if str2bool(value.get('repo_private')):
804 800 # prevent from updating default user permissions
805 801 # when this repository is marked as private
806 802 update_value = EMPTY_PERM
807 803
808 804 perm_updates.add(
809 805 (obj_id, update_value, update_type))
810 806
811 807 value['perm_additions'] = [] # propagated later
812 808 value['perm_updates'] = list(perm_updates)
813 809 value['perm_deletions'] = list(perm_deletions)
814 810
815 811 updates_map = dict(
816 812 (x[0], (x[1], x[2])) for x in value['perm_updates'])
817 813 # make sure Additions don't override updates.
818 814 for member_id, perm, member_type in list(perm_additions):
819 815 if member_id in updates_map:
820 816 perm = updates_map[member_id][0]
821 817 value['perm_additions'].append((member_id, perm, member_type))
822 818
823 819 # on new entries validate users they exist and they are active !
824 820 # this leaves feedback to the form
825 821 try:
826 822 if member_type == 'user':
827 823 User.query()\
828 824 .filter(User.active == true())\
829 825 .filter(User.user_id == member_id).one()
830 826 if member_type == 'user_group':
831 827 UserGroup.query()\
832 828 .filter(UserGroup.users_group_active == true())\
833 829 .filter(UserGroup.users_group_id == member_id)\
834 830 .one()
835 831
836 832 except Exception:
837 833 log.exception('Updated permission failed: org_exc:')
838 834 msg = M(self, 'perm_new_member_type', state)
839 835 raise formencode.Invalid(
840 836 msg, value, state, error_dict={
841 837 'perm_new_member_name': msg}
842 838 )
843 839 return value
844 840 return _validator
845 841
846 842
847 843 def ValidPath(localizer):
848 844 _ = localizer
849 845
850 846 class _validator(formencode.validators.FancyValidator):
851 847 messages = {
852 848 'invalid_path': _('This is not a valid path')
853 849 }
854 850
855 851 def _validate_python(self, value, state):
856 852 if not os.path.isdir(value):
857 853 msg = M(self, 'invalid_path', state)
858 854 raise formencode.Invalid(
859 855 msg, value, state, error_dict={'paths_root_path': msg}
860 856 )
861 857 return _validator
862 858
863 859
864 860 def UniqSystemEmail(localizer, old_data=None):
865 861 _ = localizer
866 862 old_data = old_data or {}
867 863
868 864 class _validator(formencode.validators.FancyValidator):
869 865 messages = {
870 866 'email_taken': _('This e-mail address is already taken')
871 867 }
872 868
873 869 def _convert_to_python(self, value, state):
874 870 return value.lower()
875 871
876 872 def _validate_python(self, value, state):
877 873 if (old_data.get('email') or '').lower() != value:
878 874 user = User.get_by_email(value, case_insensitive=True)
879 875 if user:
880 876 msg = M(self, 'email_taken', state)
881 877 raise formencode.Invalid(
882 878 msg, value, state, error_dict={'email': msg}
883 879 )
884 880 return _validator
885 881
886 882
887 883 def ValidSystemEmail(localizer):
888 884 _ = localizer
889 885
890 886 class _validator(formencode.validators.FancyValidator):
891 887 messages = {
892 888 'non_existing_email': _('e-mail "%(email)s" does not exist.')
893 889 }
894 890
895 891 def _convert_to_python(self, value, state):
896 892 return value.lower()
897 893
898 894 def _validate_python(self, value, state):
899 895 user = User.get_by_email(value, case_insensitive=True)
900 896 if user is None:
901 897 msg = M(self, 'non_existing_email', state, email=value)
902 898 raise formencode.Invalid(
903 899 msg, value, state, error_dict={'email': msg}
904 900 )
905 901 return _validator
906 902
907 903
908 904 def NotReviewedRevisions(localizer, repo_id):
909 905 _ = localizer
910 906 class _validator(formencode.validators.FancyValidator):
911 907 messages = {
912 908 'rev_already_reviewed':
913 909 _('Revisions %(revs)s are already part of pull request '
914 910 'or have set status'),
915 911 }
916 912
917 913 def _validate_python(self, value, state):
918 914 # check revisions if they are not reviewed, or a part of another
919 915 # pull request
920 916 statuses = ChangesetStatus.query()\
921 917 .filter(ChangesetStatus.revision.in_(value))\
922 918 .filter(ChangesetStatus.repo_id == repo_id)\
923 919 .all()
924 920
925 921 errors = []
926 922 for status in statuses:
927 923 if status.pull_request_id:
928 924 errors.append(['pull_req', status.revision[:12]])
929 925 elif status.status:
930 926 errors.append(['status', status.revision[:12]])
931 927
932 928 if errors:
933 929 revs = ','.join([x[1] for x in errors])
934 930 msg = M(self, 'rev_already_reviewed', state, revs=revs)
935 931 raise formencode.Invalid(
936 932 msg, value, state, error_dict={'revisions': revs})
937 933
938 934 return _validator
939 935
940 936
941 937 def ValidIp(localizer):
942 938 _ = localizer
943 939
944 940 class _validator(CIDR):
945 941 messages = {
946 942 'badFormat': _('Please enter a valid IPv4 or IpV6 address'),
947 943 'illegalBits': _(
948 944 'The network size (bits) must be within the range '
949 945 'of 0-32 (not %(bits)r)'),
950 946 }
951 947
952 948 # we override the default to_python() call
953 949 def to_python(self, value, state):
954 950 v = super().to_python(value, state)
955 951 v = safe_str(v.strip())
956 952 net = ipaddress.ip_network(address=v, strict=False)
957 953 return str(net)
958 954
959 955 def _validate_python(self, value, state):
960 956 try:
961 957 addr = safe_str(value.strip())
962 958 # this raises an ValueError if address is not IpV4 or IpV6
963 959 ipaddress.ip_network(addr, strict=False)
964 960 except ValueError:
965 961 raise formencode.Invalid(self.message('badFormat', state),
966 962 value, state)
967 963 return _validator
968 964
969 965
970 966 def FieldKey(localizer):
971 967 _ = localizer
972 968
973 969 class _validator(formencode.validators.FancyValidator):
974 970 messages = {
975 971 'badFormat': _(
976 972 'Key name can only consist of letters, '
977 973 'underscore, dash or numbers'),
978 974 }
979 975
980 976 def _validate_python(self, value, state):
981 977 if not re.match('[a-zA-Z0-9_-]+$', value):
982 978 raise formencode.Invalid(self.message('badFormat', state),
983 979 value, state)
984 980 return _validator
985 981
986 982
987 983 def ValidAuthPlugins(localizer):
988 984 _ = localizer
989 985
990 986 class _validator(formencode.validators.FancyValidator):
991 987 messages = {
992 988 'import_duplicate': _(
993 989 'Plugins %(loaded)s and %(next_to_load)s '
994 990 'both export the same name'),
995 991 'missing_includeme': _(
996 992 'The plugin "%(plugin_id)s" is missing an includeme '
997 993 'function.'),
998 994 'import_error': _(
999 995 'Can not load plugin "%(plugin_id)s"'),
1000 996 'no_plugin': _(
1001 997 'No plugin available with ID "%(plugin_id)s"'),
1002 998 }
1003 999
1004 1000 def _convert_to_python(self, value, state):
1005 1001 # filter empty values
1006 1002 return [s for s in value if s not in [None, '']]
1007 1003
1008 1004 def _validate_legacy_plugin_id(self, plugin_id, value, state):
1009 1005 """
1010 1006 Validates that the plugin import works. It also checks that the
1011 1007 plugin has an includeme attribute.
1012 1008 """
1013 1009 try:
1014 1010 plugin = _import_legacy_plugin(plugin_id)
1015 1011 except Exception as e:
1016 1012 log.exception(
1017 1013 'Exception during import of auth legacy plugin "{}"'
1018 1014 .format(plugin_id))
1019 1015 msg = M(self, 'import_error', state, plugin_id=plugin_id)
1020 1016 raise formencode.Invalid(msg, value, state)
1021 1017
1022 1018 if not hasattr(plugin, 'includeme'):
1023 1019 msg = M(self, 'missing_includeme', state, plugin_id=plugin_id)
1024 1020 raise formencode.Invalid(msg, value, state)
1025 1021
1026 1022 return plugin
1027 1023
1028 1024 def _validate_plugin_id(self, plugin_id, value, state):
1029 1025 """
1030 1026 Plugins are already imported during app start up. Therefore this
1031 1027 validation only retrieves the plugin from the plugin registry and
1032 1028 if it returns something not None everything is OK.
1033 1029 """
1034 1030 plugin = loadplugin(plugin_id)
1035 1031
1036 1032 if plugin is None:
1037 1033 msg = M(self, 'no_plugin', state, plugin_id=plugin_id)
1038 1034 raise formencode.Invalid(msg, value, state)
1039 1035
1040 1036 return plugin
1041 1037
1042 1038 def _validate_python(self, value, state):
1043 1039 unique_names = {}
1044 1040 for plugin_id in value:
1045 1041
1046 1042 # Validate legacy or normal plugin.
1047 1043 if plugin_id.startswith(legacy_plugin_prefix):
1048 1044 plugin = self._validate_legacy_plugin_id(
1049 1045 plugin_id, value, state)
1050 1046 else:
1051 1047 plugin = self._validate_plugin_id(plugin_id, value, state)
1052 1048
1053 1049 # Only allow unique plugin names.
1054 1050 if plugin.name in unique_names:
1055 1051 msg = M(self, 'import_duplicate', state,
1056 1052 loaded=unique_names[plugin.name],
1057 1053 next_to_load=plugin)
1058 1054 raise formencode.Invalid(msg, value, state)
1059 1055 unique_names[plugin.name] = plugin
1060 1056 return _validator
1061 1057
1062 1058
1063 1059 def ValidPattern(localizer):
1064 1060 _ = localizer
1065 1061
1066 1062 class _validator(formencode.validators.FancyValidator):
1067 1063 messages = {
1068 1064 'bad_format': _('Url must start with http or /'),
1069 1065 }
1070 1066
1071 1067 def _convert_to_python(self, value, state):
1072 1068 patterns = []
1073 1069
1074 1070 prefix = 'new_pattern'
1075 1071 for name, v in list(value.items()):
1076 1072 pattern_name = '_'.join((prefix, 'pattern'))
1077 1073 if name.startswith(pattern_name):
1078 1074 new_item_id = name[len(pattern_name)+1:]
1079 1075
1080 1076 def _field(name):
1081 1077 return '{}_{}_{}'.format(prefix, name, new_item_id)
1082 1078
1083 1079 values = {
1084 1080 'issuetracker_pat': value.get(_field('pattern')),
1085 1081 'issuetracker_url': value.get(_field('url')),
1086 1082 'issuetracker_pref': value.get(_field('prefix')),
1087 1083 'issuetracker_desc': value.get(_field('description'))
1088 1084 }
1089 1085 new_uid = md5_safe(values['issuetracker_pat'])
1090 1086
1091 1087 has_required_fields = (
1092 1088 values['issuetracker_pat']
1093 1089 and values['issuetracker_url'])
1094 1090
1095 1091 if has_required_fields:
1096 1092 # validate url that it starts with http or /
1097 1093 # otherwise it can lead to JS injections
1098 1094 # e.g specifig javascript:<malicios code>
1099 1095 if not values['issuetracker_url'].startswith(('http', '/')):
1100 1096 raise formencode.Invalid(
1101 1097 self.message('bad_format', state),
1102 1098 value, state)
1103 1099
1104 1100 settings = [
1105 1101 ('_'.join((key, new_uid)), values[key], 'unicode')
1106 1102 for key in values]
1107 1103 patterns.append(settings)
1108 1104
1109 1105 value['patterns'] = patterns
1110 1106 delete_patterns = value.get('uid') or []
1111 1107 if not isinstance(delete_patterns, (list, tuple)):
1112 1108 delete_patterns = [delete_patterns]
1113 1109 value['delete_patterns'] = delete_patterns
1114 1110 return value
1115 1111 return _validator
General Comments 0
You need to be logged in to leave comments. Login now