##// END OF EJS Templates
repo-creation: validate and forbid creating .git suffixed repository names....
marcink -
r1644:d43cef75 default
parent child Browse files
Show More
@@ -1,561 +1,561 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 this is forms validation classes
23 23 http://formencode.org/module-formencode.validators.html
24 24 for list off all availible validators
25 25
26 26 we can create our own validators
27 27
28 28 The table below outlines the options which can be used in a schema in addition to the validators themselves
29 29 pre_validators [] These validators will be applied before the schema
30 30 chained_validators [] These validators will be applied after the schema
31 31 allow_extra_fields False If True, then it is not an error when keys that aren't associated with a validator are present
32 32 filter_extra_fields False If True, then keys that aren't associated with a validator are removed
33 33 if_key_missing NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value.
34 34 ignore_key_missing False If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already
35 35
36 36
37 37 <name> = formencode.validators.<name of validator>
38 38 <name> must equal form name
39 39 list=[1,2,3,4,5]
40 40 for SELECT use formencode.All(OneOf(list), Int())
41 41
42 42 """
43 43
44 44 import deform
45 45 import logging
46 46 import formencode
47 47
48 48 from pkg_resources import resource_filename
49 49 from formencode import All, Pipe
50 50
51 51 from pylons.i18n.translation import _
52 52
53 53 from rhodecode import BACKENDS
54 54 from rhodecode.lib import helpers
55 55 from rhodecode.model import validators as v
56 56
57 57 log = logging.getLogger(__name__)
58 58
59 59
60 60 deform_templates = resource_filename('deform', 'templates')
61 61 rhodecode_templates = resource_filename('rhodecode', 'templates/forms')
62 62 search_path = (rhodecode_templates, deform_templates)
63 63
64 64
65 65 class RhodecodeFormZPTRendererFactory(deform.ZPTRendererFactory):
66 66 """ Subclass of ZPTRendererFactory to add rhodecode context variables """
67 67 def __call__(self, template_name, **kw):
68 68 kw['h'] = helpers
69 69 return self.load(template_name)(**kw)
70 70
71 71
72 72 form_renderer = RhodecodeFormZPTRendererFactory(search_path)
73 73 deform.Form.set_default_renderer(form_renderer)
74 74
75 75
76 76 def LoginForm():
77 77 class _LoginForm(formencode.Schema):
78 78 allow_extra_fields = True
79 79 filter_extra_fields = True
80 80 username = v.UnicodeString(
81 81 strip=True,
82 82 min=1,
83 83 not_empty=True,
84 84 messages={
85 85 'empty': _(u'Please enter a login'),
86 86 'tooShort': _(u'Enter a value %(min)i characters long or more')
87 87 }
88 88 )
89 89
90 90 password = v.UnicodeString(
91 91 strip=False,
92 92 min=3,
93 93 not_empty=True,
94 94 messages={
95 95 'empty': _(u'Please enter a password'),
96 96 'tooShort': _(u'Enter %(min)i characters or more')}
97 97 )
98 98
99 99 remember = v.StringBoolean(if_missing=False)
100 100
101 101 chained_validators = [v.ValidAuth()]
102 102 return _LoginForm
103 103
104 104
105 105 def UserForm(edit=False, available_languages=[], old_data={}):
106 106 class _UserForm(formencode.Schema):
107 107 allow_extra_fields = True
108 108 filter_extra_fields = True
109 109 username = All(v.UnicodeString(strip=True, min=1, not_empty=True),
110 110 v.ValidUsername(edit, old_data))
111 111 if edit:
112 112 new_password = All(
113 113 v.ValidPassword(),
114 114 v.UnicodeString(strip=False, min=6, not_empty=False)
115 115 )
116 116 password_confirmation = All(
117 117 v.ValidPassword(),
118 118 v.UnicodeString(strip=False, min=6, not_empty=False),
119 119 )
120 120 admin = v.StringBoolean(if_missing=False)
121 121 else:
122 122 password = All(
123 123 v.ValidPassword(),
124 124 v.UnicodeString(strip=False, min=6, not_empty=True)
125 125 )
126 126 password_confirmation = All(
127 127 v.ValidPassword(),
128 128 v.UnicodeString(strip=False, min=6, not_empty=False)
129 129 )
130 130
131 131 password_change = v.StringBoolean(if_missing=False)
132 132 create_repo_group = v.StringBoolean(if_missing=False)
133 133
134 134 active = v.StringBoolean(if_missing=False)
135 135 firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
136 136 lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
137 137 email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
138 138 extern_name = v.UnicodeString(strip=True)
139 139 extern_type = v.UnicodeString(strip=True)
140 140 language = v.OneOf(available_languages, hideList=False,
141 141 testValueList=True, if_missing=None)
142 142 chained_validators = [v.ValidPasswordsMatch()]
143 143 return _UserForm
144 144
145 145
146 146 def UserGroupForm(edit=False, old_data=None, allow_disabled=False):
147 147 old_data = old_data or {}
148 148
149 149 class _UserGroupForm(formencode.Schema):
150 150 allow_extra_fields = True
151 151 filter_extra_fields = True
152 152
153 153 users_group_name = All(
154 154 v.UnicodeString(strip=True, min=1, not_empty=True),
155 155 v.ValidUserGroup(edit, old_data)
156 156 )
157 157 user_group_description = v.UnicodeString(strip=True, min=1,
158 158 not_empty=False)
159 159
160 160 users_group_active = v.StringBoolean(if_missing=False)
161 161
162 162 if edit:
163 163 # this is user group owner
164 164 user = All(
165 165 v.UnicodeString(not_empty=True),
166 166 v.ValidRepoUser(allow_disabled))
167 167 return _UserGroupForm
168 168
169 169
170 170 def RepoGroupForm(edit=False, old_data=None, available_groups=None,
171 171 can_create_in_root=False, allow_disabled=False):
172 172 old_data = old_data or {}
173 173 available_groups = available_groups or []
174 174
175 175 class _RepoGroupForm(formencode.Schema):
176 176 allow_extra_fields = True
177 177 filter_extra_fields = False
178 178
179 179 group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
180 180 v.SlugifyName(),)
181 181 group_description = v.UnicodeString(strip=True, min=1,
182 182 not_empty=False)
183 183 group_copy_permissions = v.StringBoolean(if_missing=False)
184 184
185 185 group_parent_id = v.OneOf(available_groups, hideList=False,
186 186 testValueList=True, not_empty=True)
187 187 enable_locking = v.StringBoolean(if_missing=False)
188 188 chained_validators = [
189 189 v.ValidRepoGroup(edit, old_data, can_create_in_root)]
190 190
191 191 if edit:
192 192 # this is repo group owner
193 193 user = All(
194 194 v.UnicodeString(not_empty=True),
195 195 v.ValidRepoUser(allow_disabled))
196 196
197 197 return _RepoGroupForm
198 198
199 199
200 200 def RegisterForm(edit=False, old_data={}):
201 201 class _RegisterForm(formencode.Schema):
202 202 allow_extra_fields = True
203 203 filter_extra_fields = True
204 204 username = All(
205 205 v.ValidUsername(edit, old_data),
206 206 v.UnicodeString(strip=True, min=1, not_empty=True)
207 207 )
208 208 password = All(
209 209 v.ValidPassword(),
210 210 v.UnicodeString(strip=False, min=6, not_empty=True)
211 211 )
212 212 password_confirmation = All(
213 213 v.ValidPassword(),
214 214 v.UnicodeString(strip=False, min=6, not_empty=True)
215 215 )
216 216 active = v.StringBoolean(if_missing=False)
217 217 firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
218 218 lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
219 219 email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
220 220
221 221 chained_validators = [v.ValidPasswordsMatch()]
222 222
223 223 return _RegisterForm
224 224
225 225
226 226 def PasswordResetForm():
227 227 class _PasswordResetForm(formencode.Schema):
228 228 allow_extra_fields = True
229 229 filter_extra_fields = True
230 230 email = All(v.ValidSystemEmail(), v.Email(not_empty=True))
231 231 return _PasswordResetForm
232 232
233 233
234 234 def RepoForm(edit=False, old_data=None, repo_groups=None, landing_revs=None,
235 235 allow_disabled=False):
236 236 old_data = old_data or {}
237 237 repo_groups = repo_groups or []
238 238 landing_revs = landing_revs or []
239 239 supported_backends = BACKENDS.keys()
240 240
241 241 class _RepoForm(formencode.Schema):
242 242 allow_extra_fields = True
243 243 filter_extra_fields = False
244 244 repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
245 v.SlugifyName())
245 v.SlugifyName(), v.CannotHaveGitSuffix())
246 246 repo_group = All(v.CanWriteGroup(old_data),
247 247 v.OneOf(repo_groups, hideList=True))
248 248 repo_type = v.OneOf(supported_backends, required=False,
249 249 if_missing=old_data.get('repo_type'))
250 250 repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
251 251 repo_private = v.StringBoolean(if_missing=False)
252 252 repo_landing_rev = v.OneOf(landing_revs, hideList=True)
253 253 repo_copy_permissions = v.StringBoolean(if_missing=False)
254 254 clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
255 255
256 256 repo_enable_statistics = v.StringBoolean(if_missing=False)
257 257 repo_enable_downloads = v.StringBoolean(if_missing=False)
258 258 repo_enable_locking = v.StringBoolean(if_missing=False)
259 259
260 260 if edit:
261 261 # this is repo owner
262 262 user = All(
263 263 v.UnicodeString(not_empty=True),
264 264 v.ValidRepoUser(allow_disabled))
265 265 clone_uri_change = v.UnicodeString(
266 266 not_empty=False, if_missing=v.Missing)
267 267
268 268 chained_validators = [v.ValidCloneUri(),
269 269 v.ValidRepoName(edit, old_data)]
270 270 return _RepoForm
271 271
272 272
273 273 def RepoPermsForm():
274 274 class _RepoPermsForm(formencode.Schema):
275 275 allow_extra_fields = True
276 276 filter_extra_fields = False
277 277 chained_validators = [v.ValidPerms(type_='repo')]
278 278 return _RepoPermsForm
279 279
280 280
281 281 def RepoGroupPermsForm(valid_recursive_choices):
282 282 class _RepoGroupPermsForm(formencode.Schema):
283 283 allow_extra_fields = True
284 284 filter_extra_fields = False
285 285 recursive = v.OneOf(valid_recursive_choices)
286 286 chained_validators = [v.ValidPerms(type_='repo_group')]
287 287 return _RepoGroupPermsForm
288 288
289 289
290 290 def UserGroupPermsForm():
291 291 class _UserPermsForm(formencode.Schema):
292 292 allow_extra_fields = True
293 293 filter_extra_fields = False
294 294 chained_validators = [v.ValidPerms(type_='user_group')]
295 295 return _UserPermsForm
296 296
297 297
298 298 def RepoFieldForm():
299 299 class _RepoFieldForm(formencode.Schema):
300 300 filter_extra_fields = True
301 301 allow_extra_fields = True
302 302
303 303 new_field_key = All(v.FieldKey(),
304 304 v.UnicodeString(strip=True, min=3, not_empty=True))
305 305 new_field_value = v.UnicodeString(not_empty=False, if_missing=u'')
306 306 new_field_type = v.OneOf(['str', 'unicode', 'list', 'tuple'],
307 307 if_missing='str')
308 308 new_field_label = v.UnicodeString(not_empty=False)
309 309 new_field_desc = v.UnicodeString(not_empty=False)
310 310
311 311 return _RepoFieldForm
312 312
313 313
314 314 def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
315 315 repo_groups=[], landing_revs=[]):
316 316 class _RepoForkForm(formencode.Schema):
317 317 allow_extra_fields = True
318 318 filter_extra_fields = False
319 319 repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
320 320 v.SlugifyName())
321 321 repo_group = All(v.CanWriteGroup(),
322 322 v.OneOf(repo_groups, hideList=True))
323 323 repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
324 324 description = v.UnicodeString(strip=True, min=1, not_empty=True)
325 325 private = v.StringBoolean(if_missing=False)
326 326 copy_permissions = v.StringBoolean(if_missing=False)
327 327 fork_parent_id = v.UnicodeString()
328 328 chained_validators = [v.ValidForkName(edit, old_data)]
329 329 landing_rev = v.OneOf(landing_revs, hideList=True)
330 330
331 331 return _RepoForkForm
332 332
333 333
334 334 def ApplicationSettingsForm():
335 335 class _ApplicationSettingsForm(formencode.Schema):
336 336 allow_extra_fields = True
337 337 filter_extra_fields = False
338 338 rhodecode_title = v.UnicodeString(strip=True, max=40, not_empty=False)
339 339 rhodecode_realm = v.UnicodeString(strip=True, min=1, not_empty=True)
340 340 rhodecode_pre_code = v.UnicodeString(strip=True, min=1, not_empty=False)
341 341 rhodecode_post_code = v.UnicodeString(strip=True, min=1, not_empty=False)
342 342 rhodecode_captcha_public_key = v.UnicodeString(strip=True, min=1, not_empty=False)
343 343 rhodecode_captcha_private_key = v.UnicodeString(strip=True, min=1, not_empty=False)
344 344 rhodecode_create_personal_repo_group = v.StringBoolean(if_missing=False)
345 345 rhodecode_personal_repo_group_pattern = v.UnicodeString(strip=True, min=1, not_empty=False)
346 346
347 347 return _ApplicationSettingsForm
348 348
349 349
350 350 def ApplicationVisualisationForm():
351 351 class _ApplicationVisualisationForm(formencode.Schema):
352 352 allow_extra_fields = True
353 353 filter_extra_fields = False
354 354 rhodecode_show_public_icon = v.StringBoolean(if_missing=False)
355 355 rhodecode_show_private_icon = v.StringBoolean(if_missing=False)
356 356 rhodecode_stylify_metatags = v.StringBoolean(if_missing=False)
357 357
358 358 rhodecode_repository_fields = v.StringBoolean(if_missing=False)
359 359 rhodecode_lightweight_journal = v.StringBoolean(if_missing=False)
360 360 rhodecode_dashboard_items = v.Int(min=5, not_empty=True)
361 361 rhodecode_admin_grid_items = v.Int(min=5, not_empty=True)
362 362 rhodecode_show_version = v.StringBoolean(if_missing=False)
363 363 rhodecode_use_gravatar = v.StringBoolean(if_missing=False)
364 364 rhodecode_markup_renderer = v.OneOf(['markdown', 'rst'])
365 365 rhodecode_gravatar_url = v.UnicodeString(min=3)
366 366 rhodecode_clone_uri_tmpl = v.UnicodeString(min=3)
367 367 rhodecode_support_url = v.UnicodeString()
368 368 rhodecode_show_revision_number = v.StringBoolean(if_missing=False)
369 369 rhodecode_show_sha_length = v.Int(min=4, not_empty=True)
370 370
371 371 return _ApplicationVisualisationForm
372 372
373 373
374 374 class _BaseVcsSettingsForm(formencode.Schema):
375 375 allow_extra_fields = True
376 376 filter_extra_fields = False
377 377 hooks_changegroup_repo_size = v.StringBoolean(if_missing=False)
378 378 hooks_changegroup_push_logger = v.StringBoolean(if_missing=False)
379 379 hooks_outgoing_pull_logger = v.StringBoolean(if_missing=False)
380 380
381 381 # PR/Code-review
382 382 rhodecode_pr_merge_enabled = v.StringBoolean(if_missing=False)
383 383 rhodecode_use_outdated_comments = v.StringBoolean(if_missing=False)
384 384
385 385 # hg
386 386 extensions_largefiles = v.StringBoolean(if_missing=False)
387 387 phases_publish = v.StringBoolean(if_missing=False)
388 388 rhodecode_hg_use_rebase_for_merging = v.StringBoolean(if_missing=False)
389 389
390 390 # git
391 391 vcs_git_lfs_enabled = v.StringBoolean(if_missing=False)
392 392
393 393 # svn
394 394 vcs_svn_proxy_http_requests_enabled = v.StringBoolean(if_missing=False)
395 395 vcs_svn_proxy_http_server_url = v.UnicodeString(strip=True, if_missing=None)
396 396
397 397
398 398 def ApplicationUiSettingsForm():
399 399 class _ApplicationUiSettingsForm(_BaseVcsSettingsForm):
400 400 web_push_ssl = v.StringBoolean(if_missing=False)
401 401 paths_root_path = All(
402 402 v.ValidPath(),
403 403 v.UnicodeString(strip=True, min=1, not_empty=True)
404 404 )
405 405 largefiles_usercache = All(
406 406 v.ValidPath(),
407 407 v.UnicodeString(strip=True, min=2, not_empty=True))
408 408 vcs_git_lfs_store_location = All(
409 409 v.ValidPath(),
410 410 v.UnicodeString(strip=True, min=2, not_empty=True))
411 411 extensions_hgsubversion = v.StringBoolean(if_missing=False)
412 412 extensions_hggit = v.StringBoolean(if_missing=False)
413 413 new_svn_branch = v.ValidSvnPattern(section='vcs_svn_branch')
414 414 new_svn_tag = v.ValidSvnPattern(section='vcs_svn_tag')
415 415
416 416 return _ApplicationUiSettingsForm
417 417
418 418
419 419 def RepoVcsSettingsForm(repo_name):
420 420 class _RepoVcsSettingsForm(_BaseVcsSettingsForm):
421 421 inherit_global_settings = v.StringBoolean(if_missing=False)
422 422 new_svn_branch = v.ValidSvnPattern(
423 423 section='vcs_svn_branch', repo_name=repo_name)
424 424 new_svn_tag = v.ValidSvnPattern(
425 425 section='vcs_svn_tag', repo_name=repo_name)
426 426
427 427 return _RepoVcsSettingsForm
428 428
429 429
430 430 def LabsSettingsForm():
431 431 class _LabSettingsForm(formencode.Schema):
432 432 allow_extra_fields = True
433 433 filter_extra_fields = False
434 434
435 435 return _LabSettingsForm
436 436
437 437
438 438 def ApplicationPermissionsForm(
439 439 register_choices, password_reset_choices, extern_activate_choices):
440 440 class _DefaultPermissionsForm(formencode.Schema):
441 441 allow_extra_fields = True
442 442 filter_extra_fields = True
443 443
444 444 anonymous = v.StringBoolean(if_missing=False)
445 445 default_register = v.OneOf(register_choices)
446 446 default_register_message = v.UnicodeString()
447 447 default_password_reset = v.OneOf(password_reset_choices)
448 448 default_extern_activate = v.OneOf(extern_activate_choices)
449 449
450 450 return _DefaultPermissionsForm
451 451
452 452
453 453 def ObjectPermissionsForm(repo_perms_choices, group_perms_choices,
454 454 user_group_perms_choices):
455 455 class _ObjectPermissionsForm(formencode.Schema):
456 456 allow_extra_fields = True
457 457 filter_extra_fields = True
458 458 overwrite_default_repo = v.StringBoolean(if_missing=False)
459 459 overwrite_default_group = v.StringBoolean(if_missing=False)
460 460 overwrite_default_user_group = v.StringBoolean(if_missing=False)
461 461 default_repo_perm = v.OneOf(repo_perms_choices)
462 462 default_group_perm = v.OneOf(group_perms_choices)
463 463 default_user_group_perm = v.OneOf(user_group_perms_choices)
464 464
465 465 return _ObjectPermissionsForm
466 466
467 467
468 468 def UserPermissionsForm(create_choices, create_on_write_choices,
469 469 repo_group_create_choices, user_group_create_choices,
470 470 fork_choices, inherit_default_permissions_choices):
471 471 class _DefaultPermissionsForm(formencode.Schema):
472 472 allow_extra_fields = True
473 473 filter_extra_fields = True
474 474
475 475 anonymous = v.StringBoolean(if_missing=False)
476 476
477 477 default_repo_create = v.OneOf(create_choices)
478 478 default_repo_create_on_write = v.OneOf(create_on_write_choices)
479 479 default_user_group_create = v.OneOf(user_group_create_choices)
480 480 default_repo_group_create = v.OneOf(repo_group_create_choices)
481 481 default_fork_create = v.OneOf(fork_choices)
482 482 default_inherit_default_permissions = v.OneOf(inherit_default_permissions_choices)
483 483
484 484 return _DefaultPermissionsForm
485 485
486 486
487 487 def UserIndividualPermissionsForm():
488 488 class _DefaultPermissionsForm(formencode.Schema):
489 489 allow_extra_fields = True
490 490 filter_extra_fields = True
491 491
492 492 inherit_default_permissions = v.StringBoolean(if_missing=False)
493 493
494 494 return _DefaultPermissionsForm
495 495
496 496
497 497 def DefaultsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
498 498 class _DefaultsForm(formencode.Schema):
499 499 allow_extra_fields = True
500 500 filter_extra_fields = True
501 501 default_repo_type = v.OneOf(supported_backends)
502 502 default_repo_private = v.StringBoolean(if_missing=False)
503 503 default_repo_enable_statistics = v.StringBoolean(if_missing=False)
504 504 default_repo_enable_downloads = v.StringBoolean(if_missing=False)
505 505 default_repo_enable_locking = v.StringBoolean(if_missing=False)
506 506
507 507 return _DefaultsForm
508 508
509 509
510 510 def AuthSettingsForm():
511 511 class _AuthSettingsForm(formencode.Schema):
512 512 allow_extra_fields = True
513 513 filter_extra_fields = True
514 514 auth_plugins = All(v.ValidAuthPlugins(),
515 515 v.UniqueListFromString()(not_empty=True))
516 516
517 517 return _AuthSettingsForm
518 518
519 519
520 520 def UserExtraEmailForm():
521 521 class _UserExtraEmailForm(formencode.Schema):
522 522 email = All(v.UniqSystemEmail(), v.Email(not_empty=True))
523 523 return _UserExtraEmailForm
524 524
525 525
526 526 def UserExtraIpForm():
527 527 class _UserExtraIpForm(formencode.Schema):
528 528 ip = v.ValidIp()(not_empty=True)
529 529 return _UserExtraIpForm
530 530
531 531
532 532
533 533 def PullRequestForm(repo_id):
534 534 class ReviewerForm(formencode.Schema):
535 535 user_id = v.Int(not_empty=True)
536 536 reasons = All()
537 537
538 538 class _PullRequestForm(formencode.Schema):
539 539 allow_extra_fields = True
540 540 filter_extra_fields = True
541 541
542 542 user = v.UnicodeString(strip=True, required=True)
543 543 source_repo = v.UnicodeString(strip=True, required=True)
544 544 source_ref = v.UnicodeString(strip=True, required=True)
545 545 target_repo = v.UnicodeString(strip=True, required=True)
546 546 target_ref = v.UnicodeString(strip=True, required=True)
547 547 revisions = All(#v.NotReviewedRevisions(repo_id)(),
548 548 v.UniqueList()(not_empty=True))
549 549 review_members = formencode.ForEach(ReviewerForm())
550 550 pullrequest_title = v.UnicodeString(strip=True, required=True)
551 551 pullrequest_desc = v.UnicodeString(strip=True, required=False)
552 552
553 553 return _PullRequestForm
554 554
555 555
556 556 def IssueTrackerPatternsForm():
557 557 class _IssueTrackerPatternsForm(formencode.Schema):
558 558 allow_extra_fields = True
559 559 filter_extra_fields = False
560 560 chained_validators = [v.ValidPattern()]
561 561 return _IssueTrackerPatternsForm
@@ -1,321 +1,326 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 22
23 23 from rhodecode.translation import _
24 24 from rhodecode.model.validation_schema import validators, preparers, types
25 25
26 26 DEFAULT_LANDING_REF = 'rev:tip'
27 27
28 28
29 29 def get_group_and_repo(repo_name):
30 30 from rhodecode.model.repo_group import RepoGroupModel
31 31 return RepoGroupModel()._get_group_name_and_parent(
32 32 repo_name, get_object=True)
33 33
34 34
35 35 @colander.deferred
36 36 def deferred_repo_type_validator(node, kw):
37 37 options = kw.get('repo_type_options', [])
38 38 return colander.OneOf([x for x in options])
39 39
40 40
41 41 @colander.deferred
42 42 def deferred_repo_owner_validator(node, kw):
43 43
44 44 def repo_owner_validator(node, value):
45 45 from rhodecode.model.db import User
46 46 existing = User.get_by_username(value)
47 47 if not existing:
48 48 msg = _(u'Repo owner with id `{}` does not exists').format(value)
49 49 raise colander.Invalid(node, msg)
50 50
51 51 return repo_owner_validator
52 52
53 53
54 54 @colander.deferred
55 55 def deferred_landing_ref_validator(node, kw):
56 56 options = kw.get('repo_ref_options', [DEFAULT_LANDING_REF])
57 57 return colander.OneOf([x for x in options])
58 58
59 59
60 60 @colander.deferred
61 61 def deferred_fork_of_validator(node, kw):
62 62 old_values = kw.get('old_values') or {}
63 63
64 64 def fork_of_validator(node, value):
65 65 from rhodecode.model.db import Repository, RepoGroup
66 66 existing = Repository.get_by_repo_name(value)
67 67 if not existing:
68 68 msg = _(u'Fork with id `{}` does not exists').format(value)
69 69 raise colander.Invalid(node, msg)
70 70 elif old_values['repo_name'] == existing.repo_name:
71 71 msg = _(u'Cannot set fork of '
72 72 u'parameter of this repository to itself').format(value)
73 73 raise colander.Invalid(node, msg)
74 74
75 75 return fork_of_validator
76 76
77 77
78 78 @colander.deferred
79 79 def deferred_can_write_to_group_validator(node, kw):
80 80 request_user = kw.get('user')
81 81 old_values = kw.get('old_values') or {}
82 82
83 83 def can_write_to_group_validator(node, value):
84 84 """
85 85 Checks if given repo path is writable by user. This includes checks if
86 86 user is allowed to create repositories under root path or under
87 87 repo group paths
88 88 """
89 89
90 90 from rhodecode.lib.auth import (
91 91 HasPermissionAny, HasRepoGroupPermissionAny)
92 92 from rhodecode.model.repo_group import RepoGroupModel
93 93
94 94 messages = {
95 95 'invalid_repo_group':
96 96 _(u"Repository group `{}` does not exist"),
97 97 # permissions denied we expose as not existing, to prevent
98 98 # resource discovery
99 99 'permission_denied':
100 100 _(u"Repository group `{}` does not exist"),
101 101 'permission_denied_root':
102 102 _(u"You do not have the permission to store "
103 103 u"repositories in the root location.")
104 104 }
105 105
106 106 value = value['repo_group_name']
107 107
108 108 is_root_location = value is types.RootLocation
109 109 # NOT initialized validators, we must call them
110 110 can_create_repos_at_root = HasPermissionAny(
111 111 'hg.admin', 'hg.create.repository')
112 112
113 113 # if values is root location, we simply need to check if we can write
114 114 # to root location !
115 115 if is_root_location:
116 116 if can_create_repos_at_root(user=request_user):
117 117 # we can create repo group inside tool-level. No more checks
118 118 # are required
119 119 return
120 120 else:
121 121 # "fake" node name as repo_name, otherwise we oddly report
122 122 # the error as if it was coming form repo_group
123 123 # however repo_group is empty when using root location.
124 124 node.name = 'repo_name'
125 125 raise colander.Invalid(node, messages['permission_denied_root'])
126 126
127 127 # parent group not exists ? throw an error
128 128 repo_group = RepoGroupModel().get_by_group_name(value)
129 129 if value and not repo_group:
130 130 raise colander.Invalid(
131 131 node, messages['invalid_repo_group'].format(value))
132 132
133 133 gr_name = repo_group.group_name
134 134
135 135 # create repositories with write permission on group is set to true
136 136 create_on_write = HasPermissionAny(
137 137 'hg.create.write_on_repogroup.true')(user=request_user)
138 138
139 139 group_admin = HasRepoGroupPermissionAny('group.admin')(
140 140 gr_name, 'can write into group validator', user=request_user)
141 141 group_write = HasRepoGroupPermissionAny('group.write')(
142 142 gr_name, 'can write into group validator', user=request_user)
143 143
144 144 forbidden = not (group_admin or (group_write and create_on_write))
145 145
146 146 # TODO: handling of old values, and detecting no-change in path
147 147 # to skip permission checks in such cases. This only needs to be
148 148 # implemented if we use this schema in forms as well
149 149
150 150 # gid = (old_data['repo_group'].get('group_id')
151 151 # if (old_data and 'repo_group' in old_data) else None)
152 152 # value_changed = gid != safe_int(value)
153 153 # new = not old_data
154 154
155 155 # do check if we changed the value, there's a case that someone got
156 156 # revoked write permissions to a repository, he still created, we
157 157 # don't need to check permission if he didn't change the value of
158 158 # groups in form box
159 159 # if value_changed or new:
160 160 # # parent group need to be existing
161 161 # TODO: ENDS HERE
162 162
163 163 if repo_group and forbidden:
164 164 msg = messages['permission_denied'].format(value)
165 165 raise colander.Invalid(node, msg)
166 166
167 167 return can_write_to_group_validator
168 168
169 169
170 170 @colander.deferred
171 171 def deferred_unique_name_validator(node, kw):
172 172 request_user = kw.get('user')
173 173 old_values = kw.get('old_values') or {}
174 174
175 175 def unique_name_validator(node, value):
176 176 from rhodecode.model.db import Repository, RepoGroup
177 177 name_changed = value != old_values.get('repo_name')
178 178
179 179 existing = Repository.get_by_repo_name(value)
180 180 if name_changed and existing:
181 181 msg = _(u'Repository with name `{}` already exists').format(value)
182 182 raise colander.Invalid(node, msg)
183 183
184 184 existing_group = RepoGroup.get_by_group_name(value)
185 185 if name_changed and existing_group:
186 186 msg = _(u'Repository group with name `{}` already exists').format(
187 187 value)
188 188 raise colander.Invalid(node, msg)
189 189 return unique_name_validator
190 190
191 191
192 192 @colander.deferred
193 193 def deferred_repo_name_validator(node, kw):
194 return validators.valid_name_validator
194 def no_git_suffix_validator(node, value):
195 if value.endswith('.git'):
196 msg = _('Repository name cannot end with .git')
197 raise colander.Invalid(node, msg)
198 return colander.All(
199 no_git_suffix_validator, validators.valid_name_validator)
195 200
196 201
197 202 class GroupType(colander.Mapping):
198 203 def _validate(self, node, value):
199 204 try:
200 205 return dict(repo_group_name=value)
201 206 except Exception as e:
202 207 raise colander.Invalid(
203 208 node, '"${val}" is not a mapping type: ${err}'.format(
204 209 val=value, err=e))
205 210
206 211 def deserialize(self, node, cstruct):
207 212 if cstruct is colander.null:
208 213 return cstruct
209 214
210 215 appstruct = super(GroupType, self).deserialize(node, cstruct)
211 216 validated_name = appstruct['repo_group_name']
212 217
213 218 # inject group based on once deserialized data
214 219 (repo_name_without_group,
215 220 parent_group_name,
216 221 parent_group) = get_group_and_repo(validated_name)
217 222
218 223 appstruct['repo_name_without_group'] = repo_name_without_group
219 224 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
220 225 if parent_group:
221 226 appstruct['repo_group_id'] = parent_group.group_id
222 227
223 228 return appstruct
224 229
225 230
226 231 class GroupSchema(colander.SchemaNode):
227 232 schema_type = GroupType
228 233 validator = deferred_can_write_to_group_validator
229 234 missing = colander.null
230 235
231 236
232 237 class RepoGroup(GroupSchema):
233 238 repo_group_name = colander.SchemaNode(
234 239 types.GroupNameType())
235 240 repo_group_id = colander.SchemaNode(
236 241 colander.String(), missing=None)
237 242 repo_name_without_group = colander.SchemaNode(
238 243 colander.String(), missing=None)
239 244
240 245
241 246 class RepoGroupAccessSchema(colander.MappingSchema):
242 247 repo_group = RepoGroup()
243 248
244 249
245 250 class RepoNameUniqueSchema(colander.MappingSchema):
246 251 unique_repo_name = colander.SchemaNode(
247 252 colander.String(),
248 253 validator=deferred_unique_name_validator)
249 254
250 255
251 256 class RepoSchema(colander.MappingSchema):
252 257
253 258 repo_name = colander.SchemaNode(
254 259 types.RepoNameType(),
255 260 validator=deferred_repo_name_validator)
256 261
257 262 repo_type = colander.SchemaNode(
258 263 colander.String(),
259 264 validator=deferred_repo_type_validator)
260 265
261 266 repo_owner = colander.SchemaNode(
262 267 colander.String(),
263 268 validator=deferred_repo_owner_validator)
264 269
265 270 repo_description = colander.SchemaNode(
266 271 colander.String(), missing='')
267 272
268 273 repo_landing_commit_ref = colander.SchemaNode(
269 274 colander.String(),
270 275 validator=deferred_landing_ref_validator,
271 276 preparers=[preparers.strip_preparer],
272 277 missing=DEFAULT_LANDING_REF)
273 278
274 279 repo_clone_uri = colander.SchemaNode(
275 280 colander.String(),
276 281 validator=colander.All(colander.Length(min=1)),
277 282 preparers=[preparers.strip_preparer],
278 283 missing='')
279 284
280 285 repo_fork_of = colander.SchemaNode(
281 286 colander.String(),
282 287 validator=deferred_fork_of_validator,
283 288 missing=None)
284 289
285 290 repo_private = colander.SchemaNode(
286 291 types.StringBooleanType(),
287 292 missing=False)
288 293 repo_copy_permissions = colander.SchemaNode(
289 294 types.StringBooleanType(),
290 295 missing=False)
291 296 repo_enable_statistics = colander.SchemaNode(
292 297 types.StringBooleanType(),
293 298 missing=False)
294 299 repo_enable_downloads = colander.SchemaNode(
295 300 types.StringBooleanType(),
296 301 missing=False)
297 302 repo_enable_locking = colander.SchemaNode(
298 303 types.StringBooleanType(),
299 304 missing=False)
300 305
301 306 def deserialize(self, cstruct):
302 307 """
303 308 Custom deserialize that allows to chain validation, and verify
304 309 permissions, and as last step uniqueness
305 310 """
306 311
307 312 # first pass, to validate given data
308 313 appstruct = super(RepoSchema, self).deserialize(cstruct)
309 314 validated_name = appstruct['repo_name']
310 315
311 316 # second pass to validate permissions to repo_group
312 317 second = RepoGroupAccessSchema().bind(**self.bindings)
313 318 appstruct_second = second.deserialize({'repo_group': validated_name})
314 319 # save result
315 320 appstruct['repo_group'] = appstruct_second['repo_group']
316 321
317 322 # thirds to validate uniqueness
318 323 third = RepoNameUniqueSchema().bind(**self.bindings)
319 324 third.deserialize({'unique_repo_name': validated_name})
320 325
321 326 return appstruct
@@ -1,1089 +1,1109 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Set of generic validators
23 23 """
24 24
25 25 import logging
26 26 import os
27 27 import re
28 28 from collections import defaultdict
29 29
30 30 import formencode
31 31 import ipaddress
32 32 from formencode.validators import (
33 33 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
34 34 NotEmpty, IPAddress, CIDR, String, FancyValidator
35 35 )
36 36 from pylons.i18n.translation import _
37 37 from sqlalchemy.sql.expression import true
38 38 from sqlalchemy.util import OrderedSet
39 39 from webhelpers.pylonslib.secure_form import authentication_token
40 40
41 41 from rhodecode.authentication import (
42 42 legacy_plugin_prefix, _import_legacy_plugin)
43 43 from rhodecode.authentication.base import loadplugin
44 44 from rhodecode.config.routing import ADMIN_PREFIX
45 45 from rhodecode.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
46 46 from rhodecode.lib.utils import repo_name_slug, make_db_config
47 47 from rhodecode.lib.utils2 import safe_int, str2bool, aslist, md5
48 48 from rhodecode.lib.vcs.backends.git.repository import GitRepository
49 49 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
50 50 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
51 51 from rhodecode.model.db import (
52 52 RepoGroup, Repository, UserGroup, User, ChangesetStatus, Gist)
53 53 from rhodecode.model.settings import VcsSettingsModel
54 54
55 55 # silence warnings and pylint
56 56 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
57 57 NotEmpty, IPAddress, CIDR, String, FancyValidator
58 58
59 59 log = logging.getLogger(__name__)
60 60
61 61
62 62 class _Missing(object):
63 63 pass
64 64
65 65 Missing = _Missing()
66 66
67 67
68 68 class StateObj(object):
69 69 """
70 70 this is needed to translate the messages using _() in validators
71 71 """
72 72 _ = staticmethod(_)
73 73
74 74
75 75 def M(self, key, state=None, **kwargs):
76 76 """
77 77 returns string from self.message based on given key,
78 78 passed kw params are used to substitute %(named)s params inside
79 79 translated strings
80 80
81 81 :param msg:
82 82 :param state:
83 83 """
84 84 if state is None:
85 85 state = StateObj()
86 86 else:
87 87 state._ = staticmethod(_)
88 88 # inject validator into state object
89 89 return self.message(key, state, **kwargs)
90 90
91 91
92 92 def UniqueList(convert=None):
93 93 class _UniqueList(formencode.FancyValidator):
94 94 """
95 95 Unique List !
96 96 """
97 97 messages = {
98 98 'empty': _(u'Value cannot be an empty list'),
99 99 'missing_value': _(u'Value cannot be an empty list'),
100 100 }
101 101
102 102 def _to_python(self, value, state):
103 103 ret_val = []
104 104
105 105 def make_unique(value):
106 106 seen = []
107 107 return [c for c in value if not (c in seen or seen.append(c))]
108 108
109 109 if isinstance(value, list):
110 110 ret_val = make_unique(value)
111 111 elif isinstance(value, set):
112 112 ret_val = make_unique(list(value))
113 113 elif isinstance(value, tuple):
114 114 ret_val = make_unique(list(value))
115 115 elif value is None:
116 116 ret_val = []
117 117 else:
118 118 ret_val = [value]
119 119
120 120 if convert:
121 121 ret_val = map(convert, ret_val)
122 122 return ret_val
123 123
124 124 def empty_value(self, value):
125 125 return []
126 126
127 127 return _UniqueList
128 128
129 129
130 130 def UniqueListFromString():
131 131 class _UniqueListFromString(UniqueList()):
132 132 def _to_python(self, value, state):
133 133 if isinstance(value, basestring):
134 134 value = aslist(value, ',')
135 135 return super(_UniqueListFromString, self)._to_python(value, state)
136 136 return _UniqueListFromString
137 137
138 138
139 139 def ValidSvnPattern(section, repo_name=None):
140 140 class _validator(formencode.validators.FancyValidator):
141 141 messages = {
142 142 'pattern_exists': _(u'Pattern already exists'),
143 143 }
144 144
145 145 def validate_python(self, value, state):
146 146 if not value:
147 147 return
148 148 model = VcsSettingsModel(repo=repo_name)
149 149 ui_settings = model.get_svn_patterns(section=section)
150 150 for entry in ui_settings:
151 151 if value == entry.value:
152 152 msg = M(self, 'pattern_exists', state)
153 153 raise formencode.Invalid(msg, value, state)
154 154 return _validator
155 155
156 156
157 157 def ValidUsername(edit=False, old_data={}):
158 158 class _validator(formencode.validators.FancyValidator):
159 159 messages = {
160 160 'username_exists': _(u'Username "%(username)s" already exists'),
161 161 'system_invalid_username':
162 162 _(u'Username "%(username)s" is forbidden'),
163 163 'invalid_username':
164 164 _(u'Username may only contain alphanumeric characters '
165 165 u'underscores, periods or dashes and must begin with '
166 166 u'alphanumeric character or underscore')
167 167 }
168 168
169 169 def validate_python(self, value, state):
170 170 if value in ['default', 'new_user']:
171 171 msg = M(self, 'system_invalid_username', state, username=value)
172 172 raise formencode.Invalid(msg, value, state)
173 173 # check if user is unique
174 174 old_un = None
175 175 if edit:
176 176 old_un = User.get(old_data.get('user_id')).username
177 177
178 178 if old_un != value or not edit:
179 179 if User.get_by_username(value, case_insensitive=True):
180 180 msg = M(self, 'username_exists', state, username=value)
181 181 raise formencode.Invalid(msg, value, state)
182 182
183 183 if (re.match(r'^[\w]{1}[\w\-\.]{0,254}$', value)
184 184 is None):
185 185 msg = M(self, 'invalid_username', state)
186 186 raise formencode.Invalid(msg, value, state)
187 187 return _validator
188 188
189 189
190 190 def ValidRegex(msg=None):
191 191 class _validator(formencode.validators.Regex):
192 192 messages = {'invalid': msg or _(u'The input is not valid')}
193 193 return _validator
194 194
195 195
196 196 def ValidRepoUser(allow_disabled=False):
197 197 class _validator(formencode.validators.FancyValidator):
198 198 messages = {
199 199 'invalid_username': _(u'Username %(username)s is not valid'),
200 200 'disabled_username': _(u'Username %(username)s is disabled')
201 201 }
202 202
203 203 def validate_python(self, value, state):
204 204 try:
205 205 user = User.query().filter(User.username == value).one()
206 206 except Exception:
207 207 msg = M(self, 'invalid_username', state, username=value)
208 208 raise formencode.Invalid(
209 209 msg, value, state, error_dict={'username': msg}
210 210 )
211 211 if user and (not allow_disabled and not user.active):
212 212 msg = M(self, 'disabled_username', state, username=value)
213 213 raise formencode.Invalid(
214 214 msg, value, state, error_dict={'username': msg}
215 215 )
216 216
217 217 return _validator
218 218
219 219
220 220 def ValidUserGroup(edit=False, old_data={}):
221 221 class _validator(formencode.validators.FancyValidator):
222 222 messages = {
223 223 'invalid_group': _(u'Invalid user group name'),
224 224 'group_exist': _(u'User group "%(usergroup)s" already exists'),
225 225 'invalid_usergroup_name':
226 226 _(u'user group name may only contain alphanumeric '
227 227 u'characters underscores, periods or dashes and must begin '
228 228 u'with alphanumeric character')
229 229 }
230 230
231 231 def validate_python(self, value, state):
232 232 if value in ['default']:
233 233 msg = M(self, 'invalid_group', state)
234 234 raise formencode.Invalid(
235 235 msg, value, state, error_dict={'users_group_name': msg}
236 236 )
237 237 # check if group is unique
238 238 old_ugname = None
239 239 if edit:
240 240 old_id = old_data.get('users_group_id')
241 241 old_ugname = UserGroup.get(old_id).users_group_name
242 242
243 243 if old_ugname != value or not edit:
244 244 is_existing_group = UserGroup.get_by_group_name(
245 245 value, case_insensitive=True)
246 246 if is_existing_group:
247 247 msg = M(self, 'group_exist', state, usergroup=value)
248 248 raise formencode.Invalid(
249 249 msg, value, state, error_dict={'users_group_name': msg}
250 250 )
251 251
252 252 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
253 253 msg = M(self, 'invalid_usergroup_name', state)
254 254 raise formencode.Invalid(
255 255 msg, value, state, error_dict={'users_group_name': msg}
256 256 )
257 257
258 258 return _validator
259 259
260 260
261 261 def ValidRepoGroup(edit=False, old_data={}, can_create_in_root=False):
262 262 class _validator(formencode.validators.FancyValidator):
263 263 messages = {
264 264 'group_parent_id': _(u'Cannot assign this group as parent'),
265 265 'group_exists': _(u'Group "%(group_name)s" already exists'),
266 266 'repo_exists': _(u'Repository with name "%(group_name)s" '
267 267 u'already exists'),
268 268 'permission_denied': _(u"no permission to store repository group"
269 269 u"in this location"),
270 270 'permission_denied_root': _(
271 271 u"no permission to store repository group "
272 272 u"in root location")
273 273 }
274 274
275 275 def _to_python(self, value, state):
276 276 group_name = repo_name_slug(value.get('group_name', ''))
277 277 group_parent_id = safe_int(value.get('group_parent_id'))
278 278 gr = RepoGroup.get(group_parent_id)
279 279 if gr:
280 280 parent_group_path = gr.full_path
281 281 # value needs to be aware of group name in order to check
282 282 # db key This is an actual just the name to store in the
283 283 # database
284 284 group_name_full = (
285 285 parent_group_path + RepoGroup.url_sep() + group_name)
286 286 else:
287 287 group_name_full = group_name
288 288
289 289 value['group_name'] = group_name
290 290 value['group_name_full'] = group_name_full
291 291 value['group_parent_id'] = group_parent_id
292 292 return value
293 293
294 294 def validate_python(self, value, state):
295 295
296 296 old_group_name = None
297 297 group_name = value.get('group_name')
298 298 group_name_full = value.get('group_name_full')
299 299 group_parent_id = safe_int(value.get('group_parent_id'))
300 300 if group_parent_id == -1:
301 301 group_parent_id = None
302 302
303 303 group_obj = RepoGroup.get(old_data.get('group_id'))
304 304 parent_group_changed = False
305 305 if edit:
306 306 old_group_name = group_obj.group_name
307 307 old_group_parent_id = group_obj.group_parent_id
308 308
309 309 if group_parent_id != old_group_parent_id:
310 310 parent_group_changed = True
311 311
312 312 # TODO: mikhail: the following if statement is not reached
313 313 # since group_parent_id's OneOf validation fails before.
314 314 # Can be removed.
315 315
316 316 # check against setting a parent of self
317 317 parent_of_self = (
318 318 old_data['group_id'] == group_parent_id
319 319 if group_parent_id else False
320 320 )
321 321 if parent_of_self:
322 322 msg = M(self, 'group_parent_id', state)
323 323 raise formencode.Invalid(
324 324 msg, value, state, error_dict={'group_parent_id': msg}
325 325 )
326 326
327 327 # group we're moving current group inside
328 328 child_group = None
329 329 if group_parent_id:
330 330 child_group = RepoGroup.query().filter(
331 331 RepoGroup.group_id == group_parent_id).scalar()
332 332
333 333 # do a special check that we cannot move a group to one of
334 334 # it's children
335 335 if edit and child_group:
336 336 parents = [x.group_id for x in child_group.parents]
337 337 move_to_children = old_data['group_id'] in parents
338 338 if move_to_children:
339 339 msg = M(self, 'group_parent_id', state)
340 340 raise formencode.Invalid(
341 341 msg, value, state, error_dict={'group_parent_id': msg})
342 342
343 343 # Check if we have permission to store in the parent.
344 344 # Only check if the parent group changed.
345 345 if parent_group_changed:
346 346 if child_group is None:
347 347 if not can_create_in_root:
348 348 msg = M(self, 'permission_denied_root', state)
349 349 raise formencode.Invalid(
350 350 msg, value, state,
351 351 error_dict={'group_parent_id': msg})
352 352 else:
353 353 valid = HasRepoGroupPermissionAny('group.admin')
354 354 forbidden = not valid(
355 355 child_group.group_name, 'can create group validator')
356 356 if forbidden:
357 357 msg = M(self, 'permission_denied', state)
358 358 raise formencode.Invalid(
359 359 msg, value, state,
360 360 error_dict={'group_parent_id': msg})
361 361
362 362 # if we change the name or it's new group, check for existing names
363 363 # or repositories with the same name
364 364 if old_group_name != group_name_full or not edit:
365 365 # check group
366 366 gr = RepoGroup.get_by_group_name(group_name_full)
367 367 if gr:
368 368 msg = M(self, 'group_exists', state, group_name=group_name)
369 369 raise formencode.Invalid(
370 370 msg, value, state, error_dict={'group_name': msg})
371 371
372 372 # check for same repo
373 373 repo = Repository.get_by_repo_name(group_name_full)
374 374 if repo:
375 375 msg = M(self, 'repo_exists', state, group_name=group_name)
376 376 raise formencode.Invalid(
377 377 msg, value, state, error_dict={'group_name': msg})
378 378
379 379 return _validator
380 380
381 381
382 382 def ValidPassword():
383 383 class _validator(formencode.validators.FancyValidator):
384 384 messages = {
385 385 'invalid_password':
386 386 _(u'Invalid characters (non-ascii) in password')
387 387 }
388 388
389 389 def validate_python(self, value, state):
390 390 try:
391 391 (value or '').decode('ascii')
392 392 except UnicodeError:
393 393 msg = M(self, 'invalid_password', state)
394 394 raise formencode.Invalid(msg, value, state,)
395 395 return _validator
396 396
397 397
398 398 def ValidOldPassword(username):
399 399 class _validator(formencode.validators.FancyValidator):
400 400 messages = {
401 401 'invalid_password': _(u'Invalid old password')
402 402 }
403 403
404 404 def validate_python(self, value, state):
405 405 from rhodecode.authentication.base import authenticate, HTTP_TYPE
406 406 if not authenticate(username, value, '', HTTP_TYPE):
407 407 msg = M(self, 'invalid_password', state)
408 408 raise formencode.Invalid(
409 409 msg, value, state, error_dict={'current_password': msg}
410 410 )
411 411 return _validator
412 412
413 413
414 414 def ValidPasswordsMatch(
415 415 passwd='new_password', passwd_confirmation='password_confirmation'):
416 416 class _validator(formencode.validators.FancyValidator):
417 417 messages = {
418 418 'password_mismatch': _(u'Passwords do not match'),
419 419 }
420 420
421 421 def validate_python(self, value, state):
422 422
423 423 pass_val = value.get('password') or value.get(passwd)
424 424 if pass_val != value[passwd_confirmation]:
425 425 msg = M(self, 'password_mismatch', state)
426 426 raise formencode.Invalid(
427 427 msg, value, state,
428 428 error_dict={passwd: msg, passwd_confirmation: msg}
429 429 )
430 430 return _validator
431 431
432 432
433 433 def ValidAuth():
434 434 class _validator(formencode.validators.FancyValidator):
435 435 messages = {
436 436 'invalid_password': _(u'invalid password'),
437 437 'invalid_username': _(u'invalid user name'),
438 438 'disabled_account': _(u'Your account is disabled')
439 439 }
440 440
441 441 def validate_python(self, value, state):
442 442 from rhodecode.authentication.base import authenticate, HTTP_TYPE
443 443
444 444 password = value['password']
445 445 username = value['username']
446 446
447 447 if not authenticate(username, password, '', HTTP_TYPE,
448 448 skip_missing=True):
449 449 user = User.get_by_username(username)
450 450 if user and not user.active:
451 451 log.warning('user %s is disabled', username)
452 452 msg = M(self, 'disabled_account', state)
453 453 raise formencode.Invalid(
454 454 msg, value, state, error_dict={'username': msg}
455 455 )
456 456 else:
457 457 log.warning('user `%s` failed to authenticate', username)
458 458 msg = M(self, 'invalid_username', state)
459 459 msg2 = M(self, 'invalid_password', state)
460 460 raise formencode.Invalid(
461 461 msg, value, state,
462 462 error_dict={'username': msg, 'password': msg2}
463 463 )
464 464 return _validator
465 465
466 466
467 467 def ValidAuthToken():
468 468 class _validator(formencode.validators.FancyValidator):
469 469 messages = {
470 470 'invalid_token': _(u'Token mismatch')
471 471 }
472 472
473 473 def validate_python(self, value, state):
474 474 if value != authentication_token():
475 475 msg = M(self, 'invalid_token', state)
476 476 raise formencode.Invalid(msg, value, state)
477 477 return _validator
478 478
479 479
480 480 def ValidRepoName(edit=False, old_data={}):
481 481 class _validator(formencode.validators.FancyValidator):
482 482 messages = {
483 483 'invalid_repo_name':
484 484 _(u'Repository name %(repo)s is disallowed'),
485 485 # top level
486 486 'repository_exists': _(u'Repository with name %(repo)s '
487 487 u'already exists'),
488 488 'group_exists': _(u'Repository group with name "%(repo)s" '
489 489 u'already exists'),
490 490 # inside a group
491 491 'repository_in_group_exists': _(u'Repository with name %(repo)s '
492 492 u'exists in group "%(group)s"'),
493 493 'group_in_group_exists': _(
494 494 u'Repository group with name "%(repo)s" '
495 495 u'exists in group "%(group)s"'),
496 496 }
497 497
498 498 def _to_python(self, value, state):
499 499 repo_name = repo_name_slug(value.get('repo_name', ''))
500 500 repo_group = value.get('repo_group')
501 501 if repo_group:
502 502 gr = RepoGroup.get(repo_group)
503 503 group_path = gr.full_path
504 504 group_name = gr.group_name
505 505 # value needs to be aware of group name in order to check
506 506 # db key This is an actual just the name to store in the
507 507 # database
508 508 repo_name_full = group_path + RepoGroup.url_sep() + repo_name
509 509 else:
510 510 group_name = group_path = ''
511 511 repo_name_full = repo_name
512 512
513 513 value['repo_name'] = repo_name
514 514 value['repo_name_full'] = repo_name_full
515 515 value['group_path'] = group_path
516 516 value['group_name'] = group_name
517 517 return value
518 518
519 519 def validate_python(self, value, state):
520 520
521 521 repo_name = value.get('repo_name')
522 522 repo_name_full = value.get('repo_name_full')
523 523 group_path = value.get('group_path')
524 524 group_name = value.get('group_name')
525 525
526 526 if repo_name in [ADMIN_PREFIX, '']:
527 527 msg = M(self, 'invalid_repo_name', state, repo=repo_name)
528 528 raise formencode.Invalid(
529 529 msg, value, state, error_dict={'repo_name': msg})
530 530
531 531 rename = old_data.get('repo_name') != repo_name_full
532 532 create = not edit
533 533 if rename or create:
534 534
535 535 if group_path:
536 536 if Repository.get_by_repo_name(repo_name_full):
537 537 msg = M(self, 'repository_in_group_exists', state,
538 538 repo=repo_name, group=group_name)
539 539 raise formencode.Invalid(
540 540 msg, value, state, error_dict={'repo_name': msg})
541 541 if RepoGroup.get_by_group_name(repo_name_full):
542 542 msg = M(self, 'group_in_group_exists', state,
543 543 repo=repo_name, group=group_name)
544 544 raise formencode.Invalid(
545 545 msg, value, state, error_dict={'repo_name': msg})
546 546 else:
547 547 if RepoGroup.get_by_group_name(repo_name_full):
548 548 msg = M(self, 'group_exists', state, repo=repo_name)
549 549 raise formencode.Invalid(
550 550 msg, value, state, error_dict={'repo_name': msg})
551 551
552 552 if Repository.get_by_repo_name(repo_name_full):
553 553 msg = M(
554 554 self, 'repository_exists', state, repo=repo_name)
555 555 raise formencode.Invalid(
556 556 msg, value, state, error_dict={'repo_name': msg})
557 557 return value
558 558 return _validator
559 559
560 560
561 561 def ValidForkName(*args, **kwargs):
562 562 return ValidRepoName(*args, **kwargs)
563 563
564 564
565 565 def SlugifyName():
566 566 class _validator(formencode.validators.FancyValidator):
567 567
568 568 def _to_python(self, value, state):
569 569 return repo_name_slug(value)
570 570
571 571 def validate_python(self, value, state):
572 572 pass
573 573
574 574 return _validator
575 575
576 576
577 def CannotHaveGitSuffix():
578 class _validator(formencode.validators.FancyValidator):
579 messages = {
580 'has_git_suffix':
581 _(u'Repository name cannot end with .git'),
582 }
583
584 def _to_python(self, value, state):
585 return value
586
587 def validate_python(self, value, state):
588 if value and value.endswith('.git'):
589 msg = M(
590 self, 'has_git_suffix', state)
591 raise formencode.Invalid(
592 msg, value, state, error_dict={'repo_name': msg})
593
594 return _validator
595
596
577 597 def ValidCloneUri():
578 598 class InvalidCloneUrl(Exception):
579 599 allowed_prefixes = ()
580 600
581 601 def url_handler(repo_type, url):
582 602 config = make_db_config(clear_session=False)
583 603 if repo_type == 'hg':
584 604 allowed_prefixes = ('http', 'svn+http', 'git+http')
585 605
586 606 if 'http' in url[:4]:
587 607 # initially check if it's at least the proper URL
588 608 # or does it pass basic auth
589 609 MercurialRepository.check_url(url, config)
590 610 elif 'svn+http' in url[:8]: # svn->hg import
591 611 SubversionRepository.check_url(url, config)
592 612 elif 'git+http' in url[:8]: # git->hg import
593 613 raise NotImplementedError()
594 614 else:
595 615 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
596 616 'Allowed url must start with one of %s'
597 617 % (url, ','.join(allowed_prefixes)))
598 618 exc.allowed_prefixes = allowed_prefixes
599 619 raise exc
600 620
601 621 elif repo_type == 'git':
602 622 allowed_prefixes = ('http', 'svn+http', 'hg+http')
603 623 if 'http' in url[:4]:
604 624 # initially check if it's at least the proper URL
605 625 # or does it pass basic auth
606 626 GitRepository.check_url(url, config)
607 627 elif 'svn+http' in url[:8]: # svn->git import
608 628 raise NotImplementedError()
609 629 elif 'hg+http' in url[:8]: # hg->git import
610 630 raise NotImplementedError()
611 631 else:
612 632 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
613 633 'Allowed url must start with one of %s'
614 634 % (url, ','.join(allowed_prefixes)))
615 635 exc.allowed_prefixes = allowed_prefixes
616 636 raise exc
617 637
618 638 class _validator(formencode.validators.FancyValidator):
619 639 messages = {
620 640 'clone_uri': _(u'invalid clone url for %(rtype)s repository'),
621 641 'invalid_clone_uri': _(
622 642 u'Invalid clone url, provide a valid clone '
623 643 u'url starting with one of %(allowed_prefixes)s')
624 644 }
625 645
626 646 def validate_python(self, value, state):
627 647 repo_type = value.get('repo_type')
628 648 url = value.get('clone_uri')
629 649
630 650 if url:
631 651 try:
632 652 url_handler(repo_type, url)
633 653 except InvalidCloneUrl as e:
634 654 log.warning(e)
635 655 msg = M(self, 'invalid_clone_uri', rtype=repo_type,
636 656 allowed_prefixes=','.join(e.allowed_prefixes))
637 657 raise formencode.Invalid(msg, value, state,
638 658 error_dict={'clone_uri': msg})
639 659 except Exception:
640 660 log.exception('Url validation failed')
641 661 msg = M(self, 'clone_uri', rtype=repo_type)
642 662 raise formencode.Invalid(msg, value, state,
643 663 error_dict={'clone_uri': msg})
644 664 return _validator
645 665
646 666
647 667 def ValidForkType(old_data={}):
648 668 class _validator(formencode.validators.FancyValidator):
649 669 messages = {
650 670 'invalid_fork_type': _(u'Fork have to be the same type as parent')
651 671 }
652 672
653 673 def validate_python(self, value, state):
654 674 if old_data['repo_type'] != value:
655 675 msg = M(self, 'invalid_fork_type', state)
656 676 raise formencode.Invalid(
657 677 msg, value, state, error_dict={'repo_type': msg}
658 678 )
659 679 return _validator
660 680
661 681
662 682 def CanWriteGroup(old_data=None):
663 683 class _validator(formencode.validators.FancyValidator):
664 684 messages = {
665 685 'permission_denied': _(
666 686 u"You do not have the permission "
667 687 u"to create repositories in this group."),
668 688 'permission_denied_root': _(
669 689 u"You do not have the permission to store repositories in "
670 690 u"the root location.")
671 691 }
672 692
673 693 def _to_python(self, value, state):
674 694 # root location
675 695 if value in [-1, "-1"]:
676 696 return None
677 697 return value
678 698
679 699 def validate_python(self, value, state):
680 700 gr = RepoGroup.get(value)
681 701 gr_name = gr.group_name if gr else None # None means ROOT location
682 702 # create repositories with write permission on group is set to true
683 703 create_on_write = HasPermissionAny(
684 704 'hg.create.write_on_repogroup.true')()
685 705 group_admin = HasRepoGroupPermissionAny('group.admin')(
686 706 gr_name, 'can write into group validator')
687 707 group_write = HasRepoGroupPermissionAny('group.write')(
688 708 gr_name, 'can write into group validator')
689 709 forbidden = not (group_admin or (group_write and create_on_write))
690 710 can_create_repos = HasPermissionAny(
691 711 'hg.admin', 'hg.create.repository')
692 712 gid = (old_data['repo_group'].get('group_id')
693 713 if (old_data and 'repo_group' in old_data) else None)
694 714 value_changed = gid != safe_int(value)
695 715 new = not old_data
696 716 # do check if we changed the value, there's a case that someone got
697 717 # revoked write permissions to a repository, he still created, we
698 718 # don't need to check permission if he didn't change the value of
699 719 # groups in form box
700 720 if value_changed or new:
701 721 # parent group need to be existing
702 722 if gr and forbidden:
703 723 msg = M(self, 'permission_denied', state)
704 724 raise formencode.Invalid(
705 725 msg, value, state, error_dict={'repo_type': msg}
706 726 )
707 727 # check if we can write to root location !
708 728 elif gr is None and not can_create_repos():
709 729 msg = M(self, 'permission_denied_root', state)
710 730 raise formencode.Invalid(
711 731 msg, value, state, error_dict={'repo_type': msg}
712 732 )
713 733
714 734 return _validator
715 735
716 736
717 737 def ValidPerms(type_='repo'):
718 738 if type_ == 'repo_group':
719 739 EMPTY_PERM = 'group.none'
720 740 elif type_ == 'repo':
721 741 EMPTY_PERM = 'repository.none'
722 742 elif type_ == 'user_group':
723 743 EMPTY_PERM = 'usergroup.none'
724 744
725 745 class _validator(formencode.validators.FancyValidator):
726 746 messages = {
727 747 'perm_new_member_name':
728 748 _(u'This username or user group name is not valid')
729 749 }
730 750
731 751 def _to_python(self, value, state):
732 752 perm_updates = OrderedSet()
733 753 perm_additions = OrderedSet()
734 754 perm_deletions = OrderedSet()
735 755 # build a list of permission to update/delete and new permission
736 756
737 757 # Read the perm_new_member/perm_del_member attributes and group
738 758 # them by they IDs
739 759 new_perms_group = defaultdict(dict)
740 760 del_perms_group = defaultdict(dict)
741 761 for k, v in value.copy().iteritems():
742 762 if k.startswith('perm_del_member'):
743 763 # delete from org storage so we don't process that later
744 764 del value[k]
745 765 # part is `id`, `type`
746 766 _type, part = k.split('perm_del_member_')
747 767 args = part.split('_')
748 768 if len(args) == 2:
749 769 _key, pos = args
750 770 del_perms_group[pos][_key] = v
751 771 if k.startswith('perm_new_member'):
752 772 # delete from org storage so we don't process that later
753 773 del value[k]
754 774 # part is `id`, `type`, `perm`
755 775 _type, part = k.split('perm_new_member_')
756 776 args = part.split('_')
757 777 if len(args) == 2:
758 778 _key, pos = args
759 779 new_perms_group[pos][_key] = v
760 780
761 781 # store the deletes
762 782 for k in sorted(del_perms_group.keys()):
763 783 perm_dict = del_perms_group[k]
764 784 del_member = perm_dict.get('id')
765 785 del_type = perm_dict.get('type')
766 786 if del_member and del_type:
767 787 perm_deletions.add((del_member, None, del_type))
768 788
769 789 # store additions in order of how they were added in web form
770 790 for k in sorted(new_perms_group.keys()):
771 791 perm_dict = new_perms_group[k]
772 792 new_member = perm_dict.get('id')
773 793 new_type = perm_dict.get('type')
774 794 new_perm = perm_dict.get('perm')
775 795 if new_member and new_perm and new_type:
776 796 perm_additions.add((new_member, new_perm, new_type))
777 797
778 798 # get updates of permissions
779 799 # (read the existing radio button states)
780 800 for k, update_value in value.iteritems():
781 801 if k.startswith('u_perm_') or k.startswith('g_perm_'):
782 802 member = k[7:]
783 803 update_type = {'u': 'user',
784 804 'g': 'users_group'}[k[0]]
785 805 if member == User.DEFAULT_USER:
786 806 if str2bool(value.get('repo_private')):
787 807 # set none for default when updating to
788 808 # private repo protects agains form manipulation
789 809 update_value = EMPTY_PERM
790 810 perm_updates.add((member, update_value, update_type))
791 811 # check the deletes
792 812
793 813 value['perm_additions'] = list(perm_additions)
794 814 value['perm_updates'] = list(perm_updates)
795 815 value['perm_deletions'] = list(perm_deletions)
796 816
797 817 # validate users they exist and they are active !
798 818 for member_id, _perm, member_type in perm_additions:
799 819 try:
800 820 if member_type == 'user':
801 821 self.user_db = User.query()\
802 822 .filter(User.active == true())\
803 823 .filter(User.user_id == member_id).one()
804 824 if member_type == 'users_group':
805 825 self.user_db = UserGroup.query()\
806 826 .filter(UserGroup.users_group_active == true())\
807 827 .filter(UserGroup.users_group_id == member_id)\
808 828 .one()
809 829
810 830 except Exception:
811 831 log.exception('Updated permission failed: org_exc:')
812 832 msg = M(self, 'perm_new_member_type', state)
813 833 raise formencode.Invalid(
814 834 msg, value, state, error_dict={
815 835 'perm_new_member_name': msg}
816 836 )
817 837 return value
818 838 return _validator
819 839
820 840
821 841 def ValidSettings():
822 842 class _validator(formencode.validators.FancyValidator):
823 843 def _to_python(self, value, state):
824 844 # settings form for users that are not admin
825 845 # can't edit certain parameters, it's extra backup if they mangle
826 846 # with forms
827 847
828 848 forbidden_params = [
829 849 'user', 'repo_type', 'repo_enable_locking',
830 850 'repo_enable_downloads', 'repo_enable_statistics'
831 851 ]
832 852
833 853 for param in forbidden_params:
834 854 if param in value:
835 855 del value[param]
836 856 return value
837 857
838 858 def validate_python(self, value, state):
839 859 pass
840 860 return _validator
841 861
842 862
843 863 def ValidPath():
844 864 class _validator(formencode.validators.FancyValidator):
845 865 messages = {
846 866 'invalid_path': _(u'This is not a valid path')
847 867 }
848 868
849 869 def validate_python(self, value, state):
850 870 if not os.path.isdir(value):
851 871 msg = M(self, 'invalid_path', state)
852 872 raise formencode.Invalid(
853 873 msg, value, state, error_dict={'paths_root_path': msg}
854 874 )
855 875 return _validator
856 876
857 877
858 878 def UniqSystemEmail(old_data={}):
859 879 class _validator(formencode.validators.FancyValidator):
860 880 messages = {
861 881 'email_taken': _(u'This e-mail address is already taken')
862 882 }
863 883
864 884 def _to_python(self, value, state):
865 885 return value.lower()
866 886
867 887 def validate_python(self, value, state):
868 888 if (old_data.get('email') or '').lower() != value:
869 889 user = User.get_by_email(value, case_insensitive=True)
870 890 if user:
871 891 msg = M(self, 'email_taken', state)
872 892 raise formencode.Invalid(
873 893 msg, value, state, error_dict={'email': msg}
874 894 )
875 895 return _validator
876 896
877 897
878 898 def ValidSystemEmail():
879 899 class _validator(formencode.validators.FancyValidator):
880 900 messages = {
881 901 'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
882 902 }
883 903
884 904 def _to_python(self, value, state):
885 905 return value.lower()
886 906
887 907 def validate_python(self, value, state):
888 908 user = User.get_by_email(value, case_insensitive=True)
889 909 if user is None:
890 910 msg = M(self, 'non_existing_email', state, email=value)
891 911 raise formencode.Invalid(
892 912 msg, value, state, error_dict={'email': msg}
893 913 )
894 914
895 915 return _validator
896 916
897 917
898 918 def NotReviewedRevisions(repo_id):
899 919 class _validator(formencode.validators.FancyValidator):
900 920 messages = {
901 921 'rev_already_reviewed':
902 922 _(u'Revisions %(revs)s are already part of pull request '
903 923 u'or have set status'),
904 924 }
905 925
906 926 def validate_python(self, value, state):
907 927 # check revisions if they are not reviewed, or a part of another
908 928 # pull request
909 929 statuses = ChangesetStatus.query()\
910 930 .filter(ChangesetStatus.revision.in_(value))\
911 931 .filter(ChangesetStatus.repo_id == repo_id)\
912 932 .all()
913 933
914 934 errors = []
915 935 for status in statuses:
916 936 if status.pull_request_id:
917 937 errors.append(['pull_req', status.revision[:12]])
918 938 elif status.status:
919 939 errors.append(['status', status.revision[:12]])
920 940
921 941 if errors:
922 942 revs = ','.join([x[1] for x in errors])
923 943 msg = M(self, 'rev_already_reviewed', state, revs=revs)
924 944 raise formencode.Invalid(
925 945 msg, value, state, error_dict={'revisions': revs})
926 946
927 947 return _validator
928 948
929 949
930 950 def ValidIp():
931 951 class _validator(CIDR):
932 952 messages = {
933 953 'badFormat': _(u'Please enter a valid IPv4 or IpV6 address'),
934 954 'illegalBits': _(
935 955 u'The network size (bits) must be within the range '
936 956 u'of 0-32 (not %(bits)r)'),
937 957 }
938 958
939 959 # we ovveride the default to_python() call
940 960 def to_python(self, value, state):
941 961 v = super(_validator, self).to_python(value, state)
942 962 v = v.strip()
943 963 net = ipaddress.ip_network(address=v, strict=False)
944 964 return str(net)
945 965
946 966 def validate_python(self, value, state):
947 967 try:
948 968 addr = value.strip()
949 969 # this raises an ValueError if address is not IpV4 or IpV6
950 970 ipaddress.ip_network(addr, strict=False)
951 971 except ValueError:
952 972 raise formencode.Invalid(self.message('badFormat', state),
953 973 value, state)
954 974
955 975 return _validator
956 976
957 977
958 978 def FieldKey():
959 979 class _validator(formencode.validators.FancyValidator):
960 980 messages = {
961 981 'badFormat': _(
962 982 u'Key name can only consist of letters, '
963 983 u'underscore, dash or numbers'),
964 984 }
965 985
966 986 def validate_python(self, value, state):
967 987 if not re.match('[a-zA-Z0-9_-]+$', value):
968 988 raise formencode.Invalid(self.message('badFormat', state),
969 989 value, state)
970 990 return _validator
971 991
972 992
973 993 def ValidAuthPlugins():
974 994 class _validator(formencode.validators.FancyValidator):
975 995 messages = {
976 996 'import_duplicate': _(
977 997 u'Plugins %(loaded)s and %(next_to_load)s '
978 998 u'both export the same name'),
979 999 'missing_includeme': _(
980 1000 u'The plugin "%(plugin_id)s" is missing an includeme '
981 1001 u'function.'),
982 1002 'import_error': _(
983 1003 u'Can not load plugin "%(plugin_id)s"'),
984 1004 'no_plugin': _(
985 1005 u'No plugin available with ID "%(plugin_id)s"'),
986 1006 }
987 1007
988 1008 def _to_python(self, value, state):
989 1009 # filter empty values
990 1010 return filter(lambda s: s not in [None, ''], value)
991 1011
992 1012 def _validate_legacy_plugin_id(self, plugin_id, value, state):
993 1013 """
994 1014 Validates that the plugin import works. It also checks that the
995 1015 plugin has an includeme attribute.
996 1016 """
997 1017 try:
998 1018 plugin = _import_legacy_plugin(plugin_id)
999 1019 except Exception as e:
1000 1020 log.exception(
1001 1021 'Exception during import of auth legacy plugin "{}"'
1002 1022 .format(plugin_id))
1003 1023 msg = M(self, 'import_error', plugin_id=plugin_id)
1004 1024 raise formencode.Invalid(msg, value, state)
1005 1025
1006 1026 if not hasattr(plugin, 'includeme'):
1007 1027 msg = M(self, 'missing_includeme', plugin_id=plugin_id)
1008 1028 raise formencode.Invalid(msg, value, state)
1009 1029
1010 1030 return plugin
1011 1031
1012 1032 def _validate_plugin_id(self, plugin_id, value, state):
1013 1033 """
1014 1034 Plugins are already imported during app start up. Therefore this
1015 1035 validation only retrieves the plugin from the plugin registry and
1016 1036 if it returns something not None everything is OK.
1017 1037 """
1018 1038 plugin = loadplugin(plugin_id)
1019 1039
1020 1040 if plugin is None:
1021 1041 msg = M(self, 'no_plugin', plugin_id=plugin_id)
1022 1042 raise formencode.Invalid(msg, value, state)
1023 1043
1024 1044 return plugin
1025 1045
1026 1046 def validate_python(self, value, state):
1027 1047 unique_names = {}
1028 1048 for plugin_id in value:
1029 1049
1030 1050 # Validate legacy or normal plugin.
1031 1051 if plugin_id.startswith(legacy_plugin_prefix):
1032 1052 plugin = self._validate_legacy_plugin_id(
1033 1053 plugin_id, value, state)
1034 1054 else:
1035 1055 plugin = self._validate_plugin_id(plugin_id, value, state)
1036 1056
1037 1057 # Only allow unique plugin names.
1038 1058 if plugin.name in unique_names:
1039 1059 msg = M(self, 'import_duplicate', state,
1040 1060 loaded=unique_names[plugin.name],
1041 1061 next_to_load=plugin)
1042 1062 raise formencode.Invalid(msg, value, state)
1043 1063 unique_names[plugin.name] = plugin
1044 1064
1045 1065 return _validator
1046 1066
1047 1067
1048 1068 def ValidPattern():
1049 1069
1050 1070 class _Validator(formencode.validators.FancyValidator):
1051 1071
1052 1072 def _to_python(self, value, state):
1053 1073 patterns = []
1054 1074
1055 1075 prefix = 'new_pattern'
1056 1076 for name, v in value.iteritems():
1057 1077 pattern_name = '_'.join((prefix, 'pattern'))
1058 1078 if name.startswith(pattern_name):
1059 1079 new_item_id = name[len(pattern_name)+1:]
1060 1080
1061 1081 def _field(name):
1062 1082 return '%s_%s_%s' % (prefix, name, new_item_id)
1063 1083
1064 1084 values = {
1065 1085 'issuetracker_pat': value.get(_field('pattern')),
1066 1086 'issuetracker_pat': value.get(_field('pattern')),
1067 1087 'issuetracker_url': value.get(_field('url')),
1068 1088 'issuetracker_pref': value.get(_field('prefix')),
1069 1089 'issuetracker_desc': value.get(_field('description'))
1070 1090 }
1071 1091 new_uid = md5(values['issuetracker_pat'])
1072 1092
1073 1093 has_required_fields = (
1074 1094 values['issuetracker_pat']
1075 1095 and values['issuetracker_url'])
1076 1096
1077 1097 if has_required_fields:
1078 1098 settings = [
1079 1099 ('_'.join((key, new_uid)), values[key], 'unicode')
1080 1100 for key in values]
1081 1101 patterns.append(settings)
1082 1102
1083 1103 value['patterns'] = patterns
1084 1104 delete_patterns = value.get('uid') or []
1085 1105 if not isinstance(delete_patterns, (list, tuple)):
1086 1106 delete_patterns = [delete_patterns]
1087 1107 value['delete_patterns'] = delete_patterns
1088 1108 return value
1089 1109 return _Validator
@@ -1,1250 +1,1264 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urllib
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.lib import auth
27 from rhodecode.lib.utils2 import safe_str, str2bool
27 from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode
28 28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
29 29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
30 30 Permission
31 31 from rhodecode.model.meta import Session
32 32 from rhodecode.model.repo import RepoModel
33 33 from rhodecode.model.repo_group import RepoGroupModel
34 34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (
37 37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
39 39 logout_user_session)
40 40 from rhodecode.tests.fixture import Fixture, error_function
41 41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
42 42
43 43 fixture = Fixture()
44 44
45 45
46 46 @pytest.mark.usefixtures("app")
47 class TestAdminRepos:
47 class TestAdminRepos(object):
48 48
49 49 def test_index(self):
50 50 self.app.get(url('repos'))
51 51
52 52 def test_create_page_restricted(self, autologin_user, backend):
53 53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
54 54 response = self.app.get(url('new_repo'), status=200)
55 55 assert_response = AssertResponse(response)
56 56 element = assert_response.get_element('#repo_type')
57 57 assert element.text_content() == '\ngit\n'
58 58
59 59 def test_create_page_non_restricted(self, autologin_user, backend):
60 60 response = self.app.get(url('new_repo'), status=200)
61 61 assert_response = AssertResponse(response)
62 62 assert_response.element_contains('#repo_type', 'git')
63 63 assert_response.element_contains('#repo_type', 'svn')
64 64 assert_response.element_contains('#repo_type', 'hg')
65 65
66 @pytest.mark.parametrize("suffix", [u'', u''], ids=['', 'non-ascii'])
66 @pytest.mark.parametrize("suffix",
67 [u'', u'xxa'], ids=['', 'non-ascii'])
67 68 def test_create(self, autologin_user, backend, suffix, csrf_token):
68 69 repo_name_unicode = backend.new_repo_name(suffix=suffix)
69 70 repo_name = repo_name_unicode.encode('utf8')
70 71 description_unicode = u'description for newly created repo' + suffix
71 72 description = description_unicode.encode('utf8')
72 self.app.post(
73 response = self.app.post(
73 74 url('repos'),
74 75 fixture._get_repo_create_params(
75 76 repo_private=False,
76 77 repo_name=repo_name,
77 78 repo_type=backend.alias,
78 79 repo_description=description,
79 80 csrf_token=csrf_token),
80 status=302
81 )
81 status=302)
82 82
83 83 self.assert_repository_is_created_correctly(
84 84 repo_name, description, backend)
85 85
86 86 def test_create_numeric(self, autologin_user, backend, csrf_token):
87 87 numeric_repo = '1234'
88 88 repo_name = numeric_repo
89 89 description = 'description for newly created repo' + numeric_repo
90 90 self.app.post(
91 91 url('repos'),
92 92 fixture._get_repo_create_params(
93 93 repo_private=False,
94 94 repo_name=repo_name,
95 95 repo_type=backend.alias,
96 96 repo_description=description,
97 97 csrf_token=csrf_token))
98 98
99 99 self.assert_repository_is_created_correctly(
100 100 repo_name, description, backend)
101 101
102 102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
103 103 def test_create_in_group(
104 104 self, autologin_user, backend, suffix, csrf_token):
105 105 # create GROUP
106 106 group_name = 'sometest_%s' % backend.alias
107 107 gr = RepoGroupModel().create(group_name=group_name,
108 108 group_description='test',
109 109 owner=TEST_USER_ADMIN_LOGIN)
110 110 Session().commit()
111 111
112 112 repo_name = u'ingroup' + suffix
113 113 repo_name_full = RepoGroup.url_sep().join(
114 114 [group_name, repo_name])
115 115 description = u'description for newly created repo'
116 116 self.app.post(
117 117 url('repos'),
118 118 fixture._get_repo_create_params(
119 119 repo_private=False,
120 120 repo_name=safe_str(repo_name),
121 121 repo_type=backend.alias,
122 122 repo_description=description,
123 123 repo_group=gr.group_id,
124 124 csrf_token=csrf_token))
125 125
126 126 # TODO: johbo: Cleanup work to fixture
127 127 try:
128 128 self.assert_repository_is_created_correctly(
129 129 repo_name_full, description, backend)
130 130
131 131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
132 132 inherited_perms = UserRepoToPerm.query().filter(
133 133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
134 134 assert len(inherited_perms) == 1
135 135 finally:
136 136 RepoModel().delete(repo_name_full)
137 137 RepoGroupModel().delete(group_name)
138 138 Session().commit()
139 139
140 140 def test_create_in_group_numeric(
141 141 self, autologin_user, backend, csrf_token):
142 142 # create GROUP
143 143 group_name = 'sometest_%s' % backend.alias
144 144 gr = RepoGroupModel().create(group_name=group_name,
145 145 group_description='test',
146 146 owner=TEST_USER_ADMIN_LOGIN)
147 147 Session().commit()
148 148
149 149 repo_name = '12345'
150 150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
151 151 description = 'description for newly created repo'
152 152 self.app.post(
153 153 url('repos'),
154 154 fixture._get_repo_create_params(
155 155 repo_private=False,
156 156 repo_name=repo_name,
157 157 repo_type=backend.alias,
158 158 repo_description=description,
159 159 repo_group=gr.group_id,
160 160 csrf_token=csrf_token))
161 161
162 162 # TODO: johbo: Cleanup work to fixture
163 163 try:
164 164 self.assert_repository_is_created_correctly(
165 165 repo_name_full, description, backend)
166 166
167 167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
168 168 inherited_perms = UserRepoToPerm.query()\
169 169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
170 170 assert len(inherited_perms) == 1
171 171 finally:
172 172 RepoModel().delete(repo_name_full)
173 173 RepoGroupModel().delete(group_name)
174 174 Session().commit()
175 175
176 176 def test_create_in_group_without_needed_permissions(self, backend):
177 177 session = login_user_session(
178 178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
179 179 csrf_token = auth.get_csrf_token(session)
180 180 # revoke
181 181 user_model = UserModel()
182 182 # disable fork and create on default user
183 183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
184 184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
185 185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
186 186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
187 187
188 188 # disable on regular user
189 189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
190 190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
191 191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
192 192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
193 193 Session().commit()
194 194
195 195 # create GROUP
196 196 group_name = 'reg_sometest_%s' % backend.alias
197 197 gr = RepoGroupModel().create(group_name=group_name,
198 198 group_description='test',
199 199 owner=TEST_USER_ADMIN_LOGIN)
200 200 Session().commit()
201 201
202 202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
203 203 gr_allowed = RepoGroupModel().create(
204 204 group_name=group_name_allowed,
205 205 group_description='test',
206 206 owner=TEST_USER_REGULAR_LOGIN)
207 207 Session().commit()
208 208
209 209 repo_name = 'ingroup'
210 210 description = 'description for newly created repo'
211 211 response = self.app.post(
212 212 url('repos'),
213 213 fixture._get_repo_create_params(
214 214 repo_private=False,
215 215 repo_name=repo_name,
216 216 repo_type=backend.alias,
217 217 repo_description=description,
218 218 repo_group=gr.group_id,
219 219 csrf_token=csrf_token))
220 220
221 221 response.mustcontain('Invalid value')
222 222
223 223 # user is allowed to create in this group
224 224 repo_name = 'ingroup'
225 225 repo_name_full = RepoGroup.url_sep().join(
226 226 [group_name_allowed, repo_name])
227 227 description = 'description for newly created repo'
228 228 response = self.app.post(
229 229 url('repos'),
230 230 fixture._get_repo_create_params(
231 231 repo_private=False,
232 232 repo_name=repo_name,
233 233 repo_type=backend.alias,
234 234 repo_description=description,
235 235 repo_group=gr_allowed.group_id,
236 236 csrf_token=csrf_token))
237 237
238 238 # TODO: johbo: Cleanup in pytest fixture
239 239 try:
240 240 self.assert_repository_is_created_correctly(
241 241 repo_name_full, description, backend)
242 242
243 243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
244 244 inherited_perms = UserRepoToPerm.query().filter(
245 245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
246 246 assert len(inherited_perms) == 1
247 247
248 248 assert repo_on_filesystem(repo_name_full)
249 249 finally:
250 250 RepoModel().delete(repo_name_full)
251 251 RepoGroupModel().delete(group_name)
252 252 RepoGroupModel().delete(group_name_allowed)
253 253 Session().commit()
254 254
255 255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
256 256 csrf_token):
257 257 # create GROUP
258 258 group_name = 'sometest_%s' % backend.alias
259 259 gr = RepoGroupModel().create(group_name=group_name,
260 260 group_description='test',
261 261 owner=TEST_USER_ADMIN_LOGIN)
262 262 perm = Permission.get_by_key('repository.write')
263 263 RepoGroupModel().grant_user_permission(
264 264 gr, TEST_USER_REGULAR_LOGIN, perm)
265 265
266 266 # add repo permissions
267 267 Session().commit()
268 268
269 269 repo_name = 'ingroup_inherited_%s' % backend.alias
270 270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
271 271 description = 'description for newly created repo'
272 272 self.app.post(
273 273 url('repos'),
274 274 fixture._get_repo_create_params(
275 275 repo_private=False,
276 276 repo_name=repo_name,
277 277 repo_type=backend.alias,
278 278 repo_description=description,
279 279 repo_group=gr.group_id,
280 280 repo_copy_permissions=True,
281 281 csrf_token=csrf_token))
282 282
283 283 # TODO: johbo: Cleanup to pytest fixture
284 284 try:
285 285 self.assert_repository_is_created_correctly(
286 286 repo_name_full, description, backend)
287 287 except Exception:
288 288 RepoGroupModel().delete(group_name)
289 289 Session().commit()
290 290 raise
291 291
292 292 # check if inherited permissions are applied
293 293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
294 294 inherited_perms = UserRepoToPerm.query().filter(
295 295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
296 296 assert len(inherited_perms) == 2
297 297
298 298 assert TEST_USER_REGULAR_LOGIN in [
299 299 x.user.username for x in inherited_perms]
300 300 assert 'repository.write' in [
301 301 x.permission.permission_name for x in inherited_perms]
302 302
303 303 RepoModel().delete(repo_name_full)
304 304 RepoGroupModel().delete(group_name)
305 305 Session().commit()
306 306
307 307 @pytest.mark.xfail_backends(
308 308 "git", "hg", reason="Missing reposerver support")
309 309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
310 310 csrf_token):
311 311 source_repo = backend.create_repo(number_of_commits=2)
312 312 source_repo_name = source_repo.repo_name
313 313 reposerver.serve(source_repo.scm_instance())
314 314
315 315 repo_name = backend.new_repo_name()
316 316 response = self.app.post(
317 317 url('repos'),
318 318 fixture._get_repo_create_params(
319 319 repo_private=False,
320 320 repo_name=repo_name,
321 321 repo_type=backend.alias,
322 322 repo_description='',
323 323 clone_uri=reposerver.url,
324 324 csrf_token=csrf_token),
325 325 status=302)
326 326
327 327 # Should be redirected to the creating page
328 328 response.mustcontain('repo_creating')
329 329
330 330 # Expecting that both repositories have same history
331 331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
332 332 source_vcs = source_repo.scm_instance()
333 333 repo = RepoModel().get_by_repo_name(repo_name)
334 334 repo_vcs = repo.scm_instance()
335 335 assert source_vcs[0].message == repo_vcs[0].message
336 336 assert source_vcs.count() == repo_vcs.count()
337 337 assert source_vcs.commit_ids == repo_vcs.commit_ids
338 338
339 339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
340 340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
341 341 csrf_token):
342 342 repo_name = backend.new_repo_name()
343 343 description = 'description for newly created repo'
344 344 response = self.app.post(
345 345 url('repos'),
346 346 fixture._get_repo_create_params(
347 347 repo_private=False,
348 348 repo_name=repo_name,
349 349 repo_type=backend.alias,
350 350 repo_description=description,
351 351 clone_uri='http://repo.invalid/repo',
352 352 csrf_token=csrf_token))
353 353 response.mustcontain('invalid clone url')
354 354
355 355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
356 356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
357 357 self, autologin_user, backend, csrf_token):
358 358 repo_name = backend.new_repo_name()
359 359 description = 'description for newly created repo'
360 360 response = self.app.post(
361 361 url('repos'),
362 362 fixture._get_repo_create_params(
363 363 repo_private=False,
364 364 repo_name=repo_name,
365 365 repo_type=backend.alias,
366 366 repo_description=description,
367 367 clone_uri='svn+http://svn.invalid/repo',
368 368 csrf_token=csrf_token))
369 369 response.mustcontain('invalid clone url')
370 370
371 def test_create_with_git_suffix(
372 self, autologin_user, backend, csrf_token):
373 repo_name = backend.new_repo_name() + ".git"
374 description = 'description for newly created repo'
375 response = self.app.post(
376 url('repos'),
377 fixture._get_repo_create_params(
378 repo_private=False,
379 repo_name=repo_name,
380 repo_type=backend.alias,
381 repo_description=description,
382 csrf_token=csrf_token))
383 response.mustcontain('Repository name cannot end with .git')
384
371 385 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
372 386 def test_delete(self, autologin_user, backend, suffix, csrf_token):
373 387 repo = backend.create_repo(name_suffix=suffix)
374 388 repo_name = repo.repo_name
375 389
376 390 response = self.app.post(url('repo', repo_name=repo_name),
377 391 params={'_method': 'delete',
378 392 'csrf_token': csrf_token})
379 393 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
380 394 response.follow()
381 395
382 396 # check if repo was deleted from db
383 397 assert RepoModel().get_by_repo_name(repo_name) is None
384 398 assert not repo_on_filesystem(repo_name)
385 399
386 400 def test_show(self, autologin_user, backend):
387 401 self.app.get(url('repo', repo_name=backend.repo_name))
388 402
389 403 def test_edit(self, backend, autologin_user):
390 404 self.app.get(url('edit_repo', repo_name=backend.repo_name))
391 405
392 406 def test_edit_accessible_when_missing_requirements(
393 407 self, backend_hg, autologin_user):
394 408 scm_patcher = mock.patch.object(
395 409 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
396 410 with scm_patcher:
397 411 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
398 412
399 413 def test_set_private_flag_sets_default_to_none(
400 414 self, autologin_user, backend, csrf_token):
401 415 # initially repository perm should be read
402 416 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
403 417 assert len(perm) == 1
404 418 assert perm[0].permission.permission_name == 'repository.read'
405 419 assert not backend.repo.private
406 420
407 421 response = self.app.post(
408 422 url('repo', repo_name=backend.repo_name),
409 423 fixture._get_repo_create_params(
410 424 repo_private=1,
411 425 repo_name=backend.repo_name,
412 426 repo_type=backend.alias,
413 427 user=TEST_USER_ADMIN_LOGIN,
414 428 _method='put',
415 429 csrf_token=csrf_token))
416 430 assert_session_flash(
417 431 response,
418 432 msg='Repository %s updated successfully' % (backend.repo_name))
419 433 assert backend.repo.private
420 434
421 435 # now the repo default permission should be None
422 436 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
423 437 assert len(perm) == 1
424 438 assert perm[0].permission.permission_name == 'repository.none'
425 439
426 440 response = self.app.post(
427 441 url('repo', repo_name=backend.repo_name),
428 442 fixture._get_repo_create_params(
429 443 repo_private=False,
430 444 repo_name=backend.repo_name,
431 445 repo_type=backend.alias,
432 446 user=TEST_USER_ADMIN_LOGIN,
433 447 _method='put',
434 448 csrf_token=csrf_token))
435 449 assert_session_flash(
436 450 response,
437 451 msg='Repository %s updated successfully' % (backend.repo_name))
438 452 assert not backend.repo.private
439 453
440 454 # we turn off private now the repo default permission should stay None
441 455 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
442 456 assert len(perm) == 1
443 457 assert perm[0].permission.permission_name == 'repository.none'
444 458
445 459 # update this permission back
446 460 perm[0].permission = Permission.get_by_key('repository.read')
447 461 Session().add(perm[0])
448 462 Session().commit()
449 463
450 464 def test_default_user_cannot_access_private_repo_in_a_group(
451 465 self, autologin_user, user_util, backend, csrf_token):
452 466
453 467 group = user_util.create_repo_group()
454 468
455 469 repo = backend.create_repo(
456 470 repo_private=True, repo_group=group, repo_copy_permissions=True)
457 471
458 472 permissions = _get_permission_for_user(
459 473 user='default', repo=repo.repo_name)
460 474 assert len(permissions) == 1
461 475 assert permissions[0].permission.permission_name == 'repository.none'
462 476 assert permissions[0].repository.private is True
463 477
464 478 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
465 479 repo = backend.repo
466 480 response = self.app.get(
467 481 url('edit_repo_advanced', repo_name=backend.repo_name))
468 482 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
469 483 response.mustcontain(no=[opt])
470 484
471 485 def test_set_fork_of_target_repo(
472 486 self, autologin_user, backend, csrf_token):
473 487 target_repo = 'target_%s' % backend.alias
474 488 fixture.create_repo(target_repo, repo_type=backend.alias)
475 489 repo2 = Repository.get_by_repo_name(target_repo)
476 490 response = self.app.post(
477 491 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
478 492 params={'id_fork_of': repo2.repo_id, '_method': 'put',
479 493 'csrf_token': csrf_token})
480 494 repo = Repository.get_by_repo_name(backend.repo_name)
481 495 repo2 = Repository.get_by_repo_name(target_repo)
482 496 assert_session_flash(
483 497 response,
484 498 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
485 499
486 500 assert repo.fork == repo2
487 501 response = response.follow()
488 502 # check if given repo is selected
489 503
490 504 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
491 505 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
492 506
493 507 response.mustcontain(opt)
494 508
495 509 fixture.destroy_repo(target_repo, forks='detach')
496 510
497 511 @pytest.mark.backends("hg", "git")
498 512 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
499 513 csrf_token):
500 514 TARGET_REPO_MAP = {
501 515 'git': {
502 516 'type': 'hg',
503 517 'repo_name': HG_REPO},
504 518 'hg': {
505 519 'type': 'git',
506 520 'repo_name': GIT_REPO},
507 521 }
508 522 target_repo = TARGET_REPO_MAP[backend.alias]
509 523
510 524 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
511 525 response = self.app.post(
512 526 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
513 527 params={'id_fork_of': repo2.repo_id, '_method': 'put',
514 528 'csrf_token': csrf_token})
515 529 assert_session_flash(
516 530 response,
517 531 'Cannot set repository as fork of repository with other type')
518 532
519 533 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
520 534 # mark it as None
521 535 response = self.app.post(
522 536 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
523 537 params={'id_fork_of': None, '_method': 'put',
524 538 'csrf_token': csrf_token})
525 539 assert_session_flash(
526 540 response,
527 541 'Marked repo %s as fork of %s'
528 542 % (backend.repo_name, "Nothing"))
529 543 assert backend.repo.fork is None
530 544
531 545 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
532 546 repo = Repository.get_by_repo_name(backend.repo_name)
533 547 response = self.app.post(
534 548 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
535 549 params={'id_fork_of': repo.repo_id, '_method': 'put',
536 550 'csrf_token': csrf_token})
537 551 assert_session_flash(
538 552 response, 'An error occurred during this operation')
539 553
540 554 def test_create_on_top_level_without_permissions(self, backend):
541 555 session = login_user_session(
542 556 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
543 557 csrf_token = auth.get_csrf_token(session)
544 558
545 559 # revoke
546 560 user_model = UserModel()
547 561 # disable fork and create on default user
548 562 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
549 563 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
550 564 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
551 565 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
552 566
553 567 # disable on regular user
554 568 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
555 569 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
556 570 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
557 571 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
558 572 Session().commit()
559 573
560 574 repo_name = backend.new_repo_name()
561 575 description = 'description for newly created repo'
562 576 response = self.app.post(
563 577 url('repos'),
564 578 fixture._get_repo_create_params(
565 579 repo_private=False,
566 580 repo_name=repo_name,
567 581 repo_type=backend.alias,
568 582 repo_description=description,
569 583 csrf_token=csrf_token))
570 584
571 585 response.mustcontain(
572 586 u"You do not have the permission to store repositories in "
573 587 u"the root location.")
574 588
575 589 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
576 590 def test_create_repo_when_filesystem_op_fails(
577 591 self, autologin_user, backend, csrf_token):
578 592 repo_name = backend.new_repo_name()
579 593 description = 'description for newly created repo'
580 594
581 595 response = self.app.post(
582 596 url('repos'),
583 597 fixture._get_repo_create_params(
584 598 repo_private=False,
585 599 repo_name=repo_name,
586 600 repo_type=backend.alias,
587 601 repo_description=description,
588 602 csrf_token=csrf_token))
589 603
590 604 assert_session_flash(
591 605 response, 'Error creating repository %s' % repo_name)
592 606 # repo must not be in db
593 607 assert backend.repo is None
594 608 # repo must not be in filesystem !
595 609 assert not repo_on_filesystem(repo_name)
596 610
597 611 def assert_repository_is_created_correctly(
598 612 self, repo_name, description, backend):
599 repo_name_utf8 = repo_name.encode('utf-8')
613 repo_name_utf8 = safe_str(repo_name)
600 614
601 615 # run the check page that triggers the flash message
602 616 response = self.app.get(url('repo_check_home', repo_name=repo_name))
603 617 assert response.json == {u'result': True}
604 assert_session_flash(
605 response,
606 u'Created repository <a href="/%s">%s</a>'
607 % (urllib.quote(repo_name_utf8), repo_name))
618
619 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
620 urllib.quote(repo_name_utf8), repo_name)
621 assert_session_flash(response, flash_msg)
608 622
609 623 # test if the repo was created in the database
610 624 new_repo = RepoModel().get_by_repo_name(repo_name)
611 625
612 626 assert new_repo.repo_name == repo_name
613 627 assert new_repo.description == description
614 628
615 629 # test if the repository is visible in the list ?
616 630 response = self.app.get(url('summary_home', repo_name=repo_name))
617 631 response.mustcontain(repo_name)
618 632 response.mustcontain(backend.alias)
619 633
620 634 assert repo_on_filesystem(repo_name)
621 635
622 636
623 637 @pytest.mark.usefixtures("app")
624 638 class TestVcsSettings(object):
625 639 FORM_DATA = {
626 640 'inherit_global_settings': False,
627 641 'hooks_changegroup_repo_size': False,
628 642 'hooks_changegroup_push_logger': False,
629 643 'hooks_outgoing_pull_logger': False,
630 644 'extensions_largefiles': False,
631 645 'phases_publish': 'False',
632 646 'rhodecode_pr_merge_enabled': False,
633 647 'rhodecode_use_outdated_comments': False,
634 648 'new_svn_branch': '',
635 649 'new_svn_tag': ''
636 650 }
637 651
638 652 @pytest.mark.skip_backends('svn')
639 653 def test_global_settings_initial_values(self, autologin_user, backend):
640 654 repo_name = backend.repo_name
641 655 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
642 656
643 657 expected_settings = (
644 658 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
645 659 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
646 660 'hooks_outgoing_pull_logger'
647 661 )
648 662 for setting in expected_settings:
649 663 self.assert_repo_value_equals_global_value(response, setting)
650 664
651 665 def test_show_settings_requires_repo_admin_permission(
652 666 self, backend, user_util, settings_util):
653 667 repo = backend.create_repo()
654 668 repo_name = repo.repo_name
655 669 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
656 670 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
657 671 login_user_session(
658 672 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
659 673 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
660 674
661 675 def test_inherit_global_settings_flag_is_true_by_default(
662 676 self, autologin_user, backend):
663 677 repo_name = backend.repo_name
664 678 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
665 679
666 680 assert_response = AssertResponse(response)
667 681 element = assert_response.get_element('#inherit_global_settings')
668 682 assert element.checked
669 683
670 684 @pytest.mark.parametrize('checked_value', [True, False])
671 685 def test_inherit_global_settings_value(
672 686 self, autologin_user, backend, checked_value, settings_util):
673 687 repo = backend.create_repo()
674 688 repo_name = repo.repo_name
675 689 settings_util.create_repo_rhodecode_setting(
676 690 repo, 'inherit_vcs_settings', checked_value, 'bool')
677 691 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
678 692
679 693 assert_response = AssertResponse(response)
680 694 element = assert_response.get_element('#inherit_global_settings')
681 695 assert element.checked == checked_value
682 696
683 697 @pytest.mark.skip_backends('svn')
684 698 def test_hooks_settings_are_created(
685 699 self, autologin_user, backend, csrf_token):
686 700 repo_name = backend.repo_name
687 701 data = self.FORM_DATA.copy()
688 702 data['csrf_token'] = csrf_token
689 703 self.app.post(
690 704 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
691 705 settings = SettingsModel(repo=repo_name)
692 706 try:
693 707 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
694 708 ui = settings.get_ui_by_section_and_key(section, key)
695 709 assert ui.ui_active is False
696 710 finally:
697 711 self._cleanup_repo_settings(settings)
698 712
699 713 def test_hooks_settings_are_not_created_for_svn(
700 714 self, autologin_user, backend_svn, csrf_token):
701 715 repo_name = backend_svn.repo_name
702 716 data = self.FORM_DATA.copy()
703 717 data['csrf_token'] = csrf_token
704 718 self.app.post(
705 719 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
706 720 settings = SettingsModel(repo=repo_name)
707 721 try:
708 722 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
709 723 ui = settings.get_ui_by_section_and_key(section, key)
710 724 assert ui is None
711 725 finally:
712 726 self._cleanup_repo_settings(settings)
713 727
714 728 @pytest.mark.skip_backends('svn')
715 729 def test_hooks_settings_are_updated(
716 730 self, autologin_user, backend, csrf_token):
717 731 repo_name = backend.repo_name
718 732 settings = SettingsModel(repo=repo_name)
719 733 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
720 734 settings.create_ui_section_value(section, '', key=key, active=True)
721 735
722 736 data = self.FORM_DATA.copy()
723 737 data['csrf_token'] = csrf_token
724 738 self.app.post(
725 739 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
726 740 try:
727 741 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
728 742 ui = settings.get_ui_by_section_and_key(section, key)
729 743 assert ui.ui_active is False
730 744 finally:
731 745 self._cleanup_repo_settings(settings)
732 746
733 747 def test_hooks_settings_are_not_updated_for_svn(
734 748 self, autologin_user, backend_svn, csrf_token):
735 749 repo_name = backend_svn.repo_name
736 750 settings = SettingsModel(repo=repo_name)
737 751 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
738 752 settings.create_ui_section_value(section, '', key=key, active=True)
739 753
740 754 data = self.FORM_DATA.copy()
741 755 data['csrf_token'] = csrf_token
742 756 self.app.post(
743 757 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
744 758 try:
745 759 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
746 760 ui = settings.get_ui_by_section_and_key(section, key)
747 761 assert ui.ui_active is True
748 762 finally:
749 763 self._cleanup_repo_settings(settings)
750 764
751 765 @pytest.mark.skip_backends('svn')
752 766 def test_pr_settings_are_created(
753 767 self, autologin_user, backend, csrf_token):
754 768 repo_name = backend.repo_name
755 769 data = self.FORM_DATA.copy()
756 770 data['csrf_token'] = csrf_token
757 771 self.app.post(
758 772 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
759 773 settings = SettingsModel(repo=repo_name)
760 774 try:
761 775 for name in VcsSettingsModel.GENERAL_SETTINGS:
762 776 setting = settings.get_setting_by_name(name)
763 777 assert setting.app_settings_value is False
764 778 finally:
765 779 self._cleanup_repo_settings(settings)
766 780
767 781 def test_pr_settings_are_not_created_for_svn(
768 782 self, autologin_user, backend_svn, csrf_token):
769 783 repo_name = backend_svn.repo_name
770 784 data = self.FORM_DATA.copy()
771 785 data['csrf_token'] = csrf_token
772 786 self.app.post(
773 787 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
774 788 settings = SettingsModel(repo=repo_name)
775 789 try:
776 790 for name in VcsSettingsModel.GENERAL_SETTINGS:
777 791 setting = settings.get_setting_by_name(name)
778 792 assert setting is None
779 793 finally:
780 794 self._cleanup_repo_settings(settings)
781 795
782 796 def test_pr_settings_creation_requires_repo_admin_permission(
783 797 self, backend, user_util, settings_util, csrf_token):
784 798 repo = backend.create_repo()
785 799 repo_name = repo.repo_name
786 800 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
787 801
788 802 logout_user_session(self.app, csrf_token)
789 803 session = login_user_session(
790 804 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
791 805 new_csrf_token = auth.get_csrf_token(session)
792 806
793 807 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
794 808 data = self.FORM_DATA.copy()
795 809 data['csrf_token'] = new_csrf_token
796 810 settings = SettingsModel(repo=repo_name)
797 811
798 812 try:
799 813 self.app.post(
800 814 url('repo_vcs_settings', repo_name=repo_name), data,
801 815 status=302)
802 816 finally:
803 817 self._cleanup_repo_settings(settings)
804 818
805 819 @pytest.mark.skip_backends('svn')
806 820 def test_pr_settings_are_updated(
807 821 self, autologin_user, backend, csrf_token):
808 822 repo_name = backend.repo_name
809 823 settings = SettingsModel(repo=repo_name)
810 824 for name in VcsSettingsModel.GENERAL_SETTINGS:
811 825 settings.create_or_update_setting(name, True, 'bool')
812 826
813 827 data = self.FORM_DATA.copy()
814 828 data['csrf_token'] = csrf_token
815 829 self.app.post(
816 830 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
817 831 try:
818 832 for name in VcsSettingsModel.GENERAL_SETTINGS:
819 833 setting = settings.get_setting_by_name(name)
820 834 assert setting.app_settings_value is False
821 835 finally:
822 836 self._cleanup_repo_settings(settings)
823 837
824 838 def test_pr_settings_are_not_updated_for_svn(
825 839 self, autologin_user, backend_svn, csrf_token):
826 840 repo_name = backend_svn.repo_name
827 841 settings = SettingsModel(repo=repo_name)
828 842 for name in VcsSettingsModel.GENERAL_SETTINGS:
829 843 settings.create_or_update_setting(name, True, 'bool')
830 844
831 845 data = self.FORM_DATA.copy()
832 846 data['csrf_token'] = csrf_token
833 847 self.app.post(
834 848 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
835 849 try:
836 850 for name in VcsSettingsModel.GENERAL_SETTINGS:
837 851 setting = settings.get_setting_by_name(name)
838 852 assert setting.app_settings_value is True
839 853 finally:
840 854 self._cleanup_repo_settings(settings)
841 855
842 856 def test_svn_settings_are_created(
843 857 self, autologin_user, backend_svn, csrf_token, settings_util):
844 858 repo_name = backend_svn.repo_name
845 859 data = self.FORM_DATA.copy()
846 860 data['new_svn_tag'] = 'svn-tag'
847 861 data['new_svn_branch'] = 'svn-branch'
848 862 data['csrf_token'] = csrf_token
849 863
850 864 # Create few global settings to make sure that uniqueness validators
851 865 # are not triggered
852 866 settings_util.create_rhodecode_ui(
853 867 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
854 868 settings_util.create_rhodecode_ui(
855 869 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
856 870
857 871 self.app.post(
858 872 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
859 873 settings = SettingsModel(repo=repo_name)
860 874 try:
861 875 svn_branches = settings.get_ui_by_section(
862 876 VcsSettingsModel.SVN_BRANCH_SECTION)
863 877 svn_branch_names = [b.ui_value for b in svn_branches]
864 878 svn_tags = settings.get_ui_by_section(
865 879 VcsSettingsModel.SVN_TAG_SECTION)
866 880 svn_tag_names = [b.ui_value for b in svn_tags]
867 881 assert 'svn-branch' in svn_branch_names
868 882 assert 'svn-tag' in svn_tag_names
869 883 finally:
870 884 self._cleanup_repo_settings(settings)
871 885
872 886 def test_svn_settings_are_unique(
873 887 self, autologin_user, backend_svn, csrf_token, settings_util):
874 888 repo = backend_svn.repo
875 889 repo_name = repo.repo_name
876 890 data = self.FORM_DATA.copy()
877 891 data['new_svn_tag'] = 'test_tag'
878 892 data['new_svn_branch'] = 'test_branch'
879 893 data['csrf_token'] = csrf_token
880 894 settings_util.create_repo_rhodecode_ui(
881 895 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
882 896 settings_util.create_repo_rhodecode_ui(
883 897 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
884 898
885 899 response = self.app.post(
886 900 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
887 901 response.mustcontain('Pattern already exists')
888 902
889 903 def test_svn_settings_with_empty_values_are_not_created(
890 904 self, autologin_user, backend_svn, csrf_token):
891 905 repo_name = backend_svn.repo_name
892 906 data = self.FORM_DATA.copy()
893 907 data['csrf_token'] = csrf_token
894 908 self.app.post(
895 909 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
896 910 settings = SettingsModel(repo=repo_name)
897 911 try:
898 912 svn_branches = settings.get_ui_by_section(
899 913 VcsSettingsModel.SVN_BRANCH_SECTION)
900 914 svn_tags = settings.get_ui_by_section(
901 915 VcsSettingsModel.SVN_TAG_SECTION)
902 916 assert len(svn_branches) == 0
903 917 assert len(svn_tags) == 0
904 918 finally:
905 919 self._cleanup_repo_settings(settings)
906 920
907 921 def test_svn_settings_are_shown_for_svn_repository(
908 922 self, autologin_user, backend_svn, csrf_token):
909 923 repo_name = backend_svn.repo_name
910 924 response = self.app.get(
911 925 url('repo_vcs_settings', repo_name=repo_name), status=200)
912 926 response.mustcontain('Subversion Settings')
913 927
914 928 @pytest.mark.skip_backends('svn')
915 929 def test_svn_settings_are_not_created_for_not_svn_repository(
916 930 self, autologin_user, backend, csrf_token):
917 931 repo_name = backend.repo_name
918 932 data = self.FORM_DATA.copy()
919 933 data['csrf_token'] = csrf_token
920 934 self.app.post(
921 935 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
922 936 settings = SettingsModel(repo=repo_name)
923 937 try:
924 938 svn_branches = settings.get_ui_by_section(
925 939 VcsSettingsModel.SVN_BRANCH_SECTION)
926 940 svn_tags = settings.get_ui_by_section(
927 941 VcsSettingsModel.SVN_TAG_SECTION)
928 942 assert len(svn_branches) == 0
929 943 assert len(svn_tags) == 0
930 944 finally:
931 945 self._cleanup_repo_settings(settings)
932 946
933 947 @pytest.mark.skip_backends('svn')
934 948 def test_svn_settings_are_shown_only_for_svn_repository(
935 949 self, autologin_user, backend, csrf_token):
936 950 repo_name = backend.repo_name
937 951 response = self.app.get(
938 952 url('repo_vcs_settings', repo_name=repo_name), status=200)
939 953 response.mustcontain(no='Subversion Settings')
940 954
941 955 def test_hg_settings_are_created(
942 956 self, autologin_user, backend_hg, csrf_token):
943 957 repo_name = backend_hg.repo_name
944 958 data = self.FORM_DATA.copy()
945 959 data['new_svn_tag'] = 'svn-tag'
946 960 data['new_svn_branch'] = 'svn-branch'
947 961 data['csrf_token'] = csrf_token
948 962 self.app.post(
949 963 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
950 964 settings = SettingsModel(repo=repo_name)
951 965 try:
952 966 largefiles_ui = settings.get_ui_by_section_and_key(
953 967 'extensions', 'largefiles')
954 968 assert largefiles_ui.ui_active is False
955 969 phases_ui = settings.get_ui_by_section_and_key(
956 970 'phases', 'publish')
957 971 assert str2bool(phases_ui.ui_value) is False
958 972 finally:
959 973 self._cleanup_repo_settings(settings)
960 974
961 975 def test_hg_settings_are_updated(
962 976 self, autologin_user, backend_hg, csrf_token):
963 977 repo_name = backend_hg.repo_name
964 978 settings = SettingsModel(repo=repo_name)
965 979 settings.create_ui_section_value(
966 980 'extensions', '', key='largefiles', active=True)
967 981 settings.create_ui_section_value(
968 982 'phases', '1', key='publish', active=True)
969 983
970 984 data = self.FORM_DATA.copy()
971 985 data['csrf_token'] = csrf_token
972 986 self.app.post(
973 987 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
974 988 try:
975 989 largefiles_ui = settings.get_ui_by_section_and_key(
976 990 'extensions', 'largefiles')
977 991 assert largefiles_ui.ui_active is False
978 992 phases_ui = settings.get_ui_by_section_and_key(
979 993 'phases', 'publish')
980 994 assert str2bool(phases_ui.ui_value) is False
981 995 finally:
982 996 self._cleanup_repo_settings(settings)
983 997
984 998 def test_hg_settings_are_shown_for_hg_repository(
985 999 self, autologin_user, backend_hg, csrf_token):
986 1000 repo_name = backend_hg.repo_name
987 1001 response = self.app.get(
988 1002 url('repo_vcs_settings', repo_name=repo_name), status=200)
989 1003 response.mustcontain('Mercurial Settings')
990 1004
991 1005 @pytest.mark.skip_backends('hg')
992 1006 def test_hg_settings_are_created_only_for_hg_repository(
993 1007 self, autologin_user, backend, csrf_token):
994 1008 repo_name = backend.repo_name
995 1009 data = self.FORM_DATA.copy()
996 1010 data['csrf_token'] = csrf_token
997 1011 self.app.post(
998 1012 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
999 1013 settings = SettingsModel(repo=repo_name)
1000 1014 try:
1001 1015 largefiles_ui = settings.get_ui_by_section_and_key(
1002 1016 'extensions', 'largefiles')
1003 1017 assert largefiles_ui is None
1004 1018 phases_ui = settings.get_ui_by_section_and_key(
1005 1019 'phases', 'publish')
1006 1020 assert phases_ui is None
1007 1021 finally:
1008 1022 self._cleanup_repo_settings(settings)
1009 1023
1010 1024 @pytest.mark.skip_backends('hg')
1011 1025 def test_hg_settings_are_shown_only_for_hg_repository(
1012 1026 self, autologin_user, backend, csrf_token):
1013 1027 repo_name = backend.repo_name
1014 1028 response = self.app.get(
1015 1029 url('repo_vcs_settings', repo_name=repo_name), status=200)
1016 1030 response.mustcontain(no='Mercurial Settings')
1017 1031
1018 1032 @pytest.mark.skip_backends('hg')
1019 1033 def test_hg_settings_are_updated_only_for_hg_repository(
1020 1034 self, autologin_user, backend, csrf_token):
1021 1035 repo_name = backend.repo_name
1022 1036 settings = SettingsModel(repo=repo_name)
1023 1037 settings.create_ui_section_value(
1024 1038 'extensions', '', key='largefiles', active=True)
1025 1039 settings.create_ui_section_value(
1026 1040 'phases', '1', key='publish', active=True)
1027 1041
1028 1042 data = self.FORM_DATA.copy()
1029 1043 data['csrf_token'] = csrf_token
1030 1044 self.app.post(
1031 1045 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1032 1046 try:
1033 1047 largefiles_ui = settings.get_ui_by_section_and_key(
1034 1048 'extensions', 'largefiles')
1035 1049 assert largefiles_ui.ui_active is True
1036 1050 phases_ui = settings.get_ui_by_section_and_key(
1037 1051 'phases', 'publish')
1038 1052 assert phases_ui.ui_value == '1'
1039 1053 finally:
1040 1054 self._cleanup_repo_settings(settings)
1041 1055
1042 1056 def test_per_repo_svn_settings_are_displayed(
1043 1057 self, autologin_user, backend_svn, settings_util):
1044 1058 repo = backend_svn.create_repo()
1045 1059 repo_name = repo.repo_name
1046 1060 branches = [
1047 1061 settings_util.create_repo_rhodecode_ui(
1048 1062 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1049 1063 'branch_{}'.format(i))
1050 1064 for i in range(10)]
1051 1065 tags = [
1052 1066 settings_util.create_repo_rhodecode_ui(
1053 1067 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1054 1068 for i in range(10)]
1055 1069
1056 1070 response = self.app.get(
1057 1071 url('repo_vcs_settings', repo_name=repo_name), status=200)
1058 1072 assert_response = AssertResponse(response)
1059 1073 for branch in branches:
1060 1074 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1061 1075 element = assert_response.get_element(css_selector)
1062 1076 assert element.value == branch.ui_value
1063 1077 for tag in tags:
1064 1078 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1065 1079 element = assert_response.get_element(css_selector)
1066 1080 assert element.value == tag.ui_value
1067 1081
1068 1082 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1069 1083 self, autologin_user, backend_svn, settings_util):
1070 1084 repo = backend_svn.create_repo()
1071 1085 repo_name = repo.repo_name
1072 1086 response = self.app.get(
1073 1087 url('repo_vcs_settings', repo_name=repo_name), status=200)
1074 1088 response.mustcontain(no='<label>Hooks:</label>')
1075 1089 response.mustcontain(no='<label>Pull Request Settings:</label>')
1076 1090
1077 1091 def test_inherit_global_settings_value_is_saved(
1078 1092 self, autologin_user, backend, csrf_token):
1079 1093 repo_name = backend.repo_name
1080 1094 data = self.FORM_DATA.copy()
1081 1095 data['csrf_token'] = csrf_token
1082 1096 data['inherit_global_settings'] = True
1083 1097 self.app.post(
1084 1098 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1085 1099
1086 1100 settings = SettingsModel(repo=repo_name)
1087 1101 vcs_settings = VcsSettingsModel(repo=repo_name)
1088 1102 try:
1089 1103 assert vcs_settings.inherit_global_settings is True
1090 1104 finally:
1091 1105 self._cleanup_repo_settings(settings)
1092 1106
1093 1107 def test_repo_cache_is_invalidated_when_settings_are_updated(
1094 1108 self, autologin_user, backend, csrf_token):
1095 1109 repo_name = backend.repo_name
1096 1110 data = self.FORM_DATA.copy()
1097 1111 data['csrf_token'] = csrf_token
1098 1112 data['inherit_global_settings'] = True
1099 1113 settings = SettingsModel(repo=repo_name)
1100 1114
1101 1115 invalidation_patcher = mock.patch(
1102 1116 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1103 1117 with invalidation_patcher as invalidation_mock:
1104 1118 self.app.post(
1105 1119 url('repo_vcs_settings', repo_name=repo_name), data,
1106 1120 status=302)
1107 1121 try:
1108 1122 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1109 1123 finally:
1110 1124 self._cleanup_repo_settings(settings)
1111 1125
1112 1126 def test_other_settings_not_saved_inherit_global_settings_is_true(
1113 1127 self, autologin_user, backend, csrf_token):
1114 1128 repo_name = backend.repo_name
1115 1129 data = self.FORM_DATA.copy()
1116 1130 data['csrf_token'] = csrf_token
1117 1131 data['inherit_global_settings'] = True
1118 1132 self.app.post(
1119 1133 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1120 1134
1121 1135 settings = SettingsModel(repo=repo_name)
1122 1136 ui_settings = (
1123 1137 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1124 1138
1125 1139 vcs_settings = []
1126 1140 try:
1127 1141 for section, key in ui_settings:
1128 1142 ui = settings.get_ui_by_section_and_key(section, key)
1129 1143 if ui:
1130 1144 vcs_settings.append(ui)
1131 1145 vcs_settings.extend(settings.get_ui_by_section(
1132 1146 VcsSettingsModel.SVN_BRANCH_SECTION))
1133 1147 vcs_settings.extend(settings.get_ui_by_section(
1134 1148 VcsSettingsModel.SVN_TAG_SECTION))
1135 1149 for name in VcsSettingsModel.GENERAL_SETTINGS:
1136 1150 setting = settings.get_setting_by_name(name)
1137 1151 if setting:
1138 1152 vcs_settings.append(setting)
1139 1153 assert vcs_settings == []
1140 1154 finally:
1141 1155 self._cleanup_repo_settings(settings)
1142 1156
1143 1157 def test_delete_svn_branch_and_tag_patterns(
1144 1158 self, autologin_user, backend_svn, settings_util, csrf_token):
1145 1159 repo = backend_svn.create_repo()
1146 1160 repo_name = repo.repo_name
1147 1161 branch = settings_util.create_repo_rhodecode_ui(
1148 1162 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1149 1163 cleanup=False)
1150 1164 tag = settings_util.create_repo_rhodecode_ui(
1151 1165 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1152 1166 data = {
1153 1167 '_method': 'delete',
1154 1168 'csrf_token': csrf_token
1155 1169 }
1156 1170 for id_ in (branch.ui_id, tag.ui_id):
1157 1171 data['delete_svn_pattern'] = id_,
1158 1172 self.app.post(
1159 1173 url('repo_vcs_settings', repo_name=repo_name), data,
1160 1174 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1161 1175 settings = VcsSettingsModel(repo=repo_name)
1162 1176 assert settings.get_repo_svn_branch_patterns() == []
1163 1177
1164 1178 def test_delete_svn_branch_requires_repo_admin_permission(
1165 1179 self, backend_svn, user_util, settings_util, csrf_token):
1166 1180 repo = backend_svn.create_repo()
1167 1181 repo_name = repo.repo_name
1168 1182 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1169 1183 logout_user_session(self.app, csrf_token)
1170 1184 session = login_user_session(
1171 1185 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1172 1186 csrf_token = auth.get_csrf_token(session)
1173 1187 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1174 1188 branch = settings_util.create_repo_rhodecode_ui(
1175 1189 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1176 1190 cleanup=False)
1177 1191 data = {
1178 1192 '_method': 'delete',
1179 1193 'csrf_token': csrf_token,
1180 1194 'delete_svn_pattern': branch.ui_id
1181 1195 }
1182 1196 self.app.post(
1183 1197 url('repo_vcs_settings', repo_name=repo_name), data,
1184 1198 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1185 1199
1186 1200 def test_delete_svn_branch_raises_400_when_not_found(
1187 1201 self, autologin_user, backend_svn, settings_util, csrf_token):
1188 1202 repo_name = backend_svn.repo_name
1189 1203 data = {
1190 1204 '_method': 'delete',
1191 1205 'delete_svn_pattern': 123,
1192 1206 'csrf_token': csrf_token
1193 1207 }
1194 1208 self.app.post(
1195 1209 url('repo_vcs_settings', repo_name=repo_name), data,
1196 1210 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1197 1211
1198 1212 def test_delete_svn_branch_raises_400_when_no_id_specified(
1199 1213 self, autologin_user, backend_svn, settings_util, csrf_token):
1200 1214 repo_name = backend_svn.repo_name
1201 1215 data = {
1202 1216 '_method': 'delete',
1203 1217 'csrf_token': csrf_token
1204 1218 }
1205 1219 self.app.post(
1206 1220 url('repo_vcs_settings', repo_name=repo_name), data,
1207 1221 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1208 1222
1209 1223 def _cleanup_repo_settings(self, settings_model):
1210 1224 cleanup = []
1211 1225 ui_settings = (
1212 1226 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1213 1227
1214 1228 for section, key in ui_settings:
1215 1229 ui = settings_model.get_ui_by_section_and_key(section, key)
1216 1230 if ui:
1217 1231 cleanup.append(ui)
1218 1232
1219 1233 cleanup.extend(settings_model.get_ui_by_section(
1220 1234 VcsSettingsModel.INHERIT_SETTINGS))
1221 1235 cleanup.extend(settings_model.get_ui_by_section(
1222 1236 VcsSettingsModel.SVN_BRANCH_SECTION))
1223 1237 cleanup.extend(settings_model.get_ui_by_section(
1224 1238 VcsSettingsModel.SVN_TAG_SECTION))
1225 1239
1226 1240 for name in VcsSettingsModel.GENERAL_SETTINGS:
1227 1241 setting = settings_model.get_setting_by_name(name)
1228 1242 if setting:
1229 1243 cleanup.append(setting)
1230 1244
1231 1245 for object_ in cleanup:
1232 1246 Session().delete(object_)
1233 1247 Session().commit()
1234 1248
1235 1249 def assert_repo_value_equals_global_value(self, response, setting):
1236 1250 assert_response = AssertResponse(response)
1237 1251 global_css_selector = '[name={}_inherited]'.format(setting)
1238 1252 repo_css_selector = '[name={}]'.format(setting)
1239 1253 repo_element = assert_response.get_element(repo_css_selector)
1240 1254 global_element = assert_response.get_element(global_css_selector)
1241 1255 assert repo_element.value == global_element.value
1242 1256
1243 1257
1244 1258 def _get_permission_for_user(user, repo):
1245 1259 perm = UserRepoToPerm.query()\
1246 1260 .filter(UserRepoToPerm.repository ==
1247 1261 Repository.get_by_repo_name(repo))\
1248 1262 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1249 1263 .all()
1250 1264 return perm
General Comments 0
You need to be logged in to leave comments. Login now