##// END OF EJS Templates
tests: fix usage of false in test, in forms it's always with capital letter.
marcink -
r1568:b820766b default
parent child Browse files
Show More
@@ -1,611 +1,611 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Database creation, and setup module for RhodeCode Enterprise. Used for creation
23 23 of database as well as for migration operations
24 24 """
25 25
26 26 import os
27 27 import sys
28 28 import time
29 29 import uuid
30 30 import logging
31 31 import getpass
32 32 from os.path import dirname as dn, join as jn
33 33
34 34 from sqlalchemy.engine import create_engine
35 35
36 36 from rhodecode import __dbversion__
37 37 from rhodecode.model import init_model
38 38 from rhodecode.model.user import UserModel
39 39 from rhodecode.model.db import (
40 40 User, Permission, RhodeCodeUi, RhodeCodeSetting, UserToPerm,
41 41 DbMigrateVersion, RepoGroup, UserRepoGroupToPerm, CacheKey, Repository)
42 42 from rhodecode.model.meta import Session, Base
43 43 from rhodecode.model.permission import PermissionModel
44 44 from rhodecode.model.repo import RepoModel
45 45 from rhodecode.model.repo_group import RepoGroupModel
46 46 from rhodecode.model.settings import SettingsModel
47 47
48 48
49 49 log = logging.getLogger(__name__)
50 50
51 51
52 52 def notify(msg):
53 53 """
54 54 Notification for migrations messages
55 55 """
56 56 ml = len(msg) + (4 * 2)
57 57 print('\n%s\n*** %s ***\n%s' % ('*' * ml, msg, '*' * ml)).upper()
58 58
59 59
60 60 class DbManage(object):
61 61
62 62 def __init__(self, log_sql, dbconf, root, tests=False,
63 63 SESSION=None, cli_args={}):
64 64 self.dbname = dbconf.split('/')[-1]
65 65 self.tests = tests
66 66 self.root = root
67 67 self.dburi = dbconf
68 68 self.log_sql = log_sql
69 69 self.db_exists = False
70 70 self.cli_args = cli_args
71 71 self.init_db(SESSION=SESSION)
72 72 self.ask_ok = self.get_ask_ok_func(self.cli_args.get('force_ask'))
73 73
74 74 def get_ask_ok_func(self, param):
75 75 if param not in [None]:
76 76 # return a function lambda that has a default set to param
77 77 return lambda *args, **kwargs: param
78 78 else:
79 79 from rhodecode.lib.utils import ask_ok
80 80 return ask_ok
81 81
82 82 def init_db(self, SESSION=None):
83 83 if SESSION:
84 84 self.sa = SESSION
85 85 else:
86 86 # init new sessions
87 87 engine = create_engine(self.dburi, echo=self.log_sql)
88 88 init_model(engine)
89 89 self.sa = Session()
90 90
91 91 def create_tables(self, override=False):
92 92 """
93 93 Create a auth database
94 94 """
95 95
96 96 log.info("Existing database with the same name is going to be destroyed.")
97 97 log.info("Setup command will run DROP ALL command on that database.")
98 98 if self.tests:
99 99 destroy = True
100 100 else:
101 101 destroy = self.ask_ok('Are you sure that you want to destroy the old database? [y/n]')
102 102 if not destroy:
103 103 log.info('Nothing done.')
104 104 sys.exit(0)
105 105 if destroy:
106 106 Base.metadata.drop_all()
107 107
108 108 checkfirst = not override
109 109 Base.metadata.create_all(checkfirst=checkfirst)
110 110 log.info('Created tables for %s' % self.dbname)
111 111
112 112 def set_db_version(self):
113 113 ver = DbMigrateVersion()
114 114 ver.version = __dbversion__
115 115 ver.repository_id = 'rhodecode_db_migrations'
116 116 ver.repository_path = 'versions'
117 117 self.sa.add(ver)
118 118 log.info('db version set to: %s' % __dbversion__)
119 119
120 120 def run_pre_migration_tasks(self):
121 121 """
122 122 Run various tasks before actually doing migrations
123 123 """
124 124 # delete cache keys on each upgrade
125 125 total = CacheKey.query().count()
126 126 log.info("Deleting (%s) cache keys now...", total)
127 127 CacheKey.delete_all_cache()
128 128
129 129 def upgrade(self):
130 130 """
131 131 Upgrades given database schema to given revision following
132 132 all needed steps, to perform the upgrade
133 133
134 134 """
135 135
136 136 from rhodecode.lib.dbmigrate.migrate.versioning import api
137 137 from rhodecode.lib.dbmigrate.migrate.exceptions import \
138 138 DatabaseNotControlledError
139 139
140 140 if 'sqlite' in self.dburi:
141 141 print (
142 142 '********************** WARNING **********************\n'
143 143 'Make sure your version of sqlite is at least 3.7.X. \n'
144 144 'Earlier versions are known to fail on some migrations\n'
145 145 '*****************************************************\n')
146 146
147 147 upgrade = self.ask_ok(
148 148 'You are about to perform a database upgrade. Make '
149 149 'sure you have backed up your database. '
150 150 'Continue ? [y/n]')
151 151 if not upgrade:
152 152 log.info('No upgrade performed')
153 153 sys.exit(0)
154 154
155 155 repository_path = jn(dn(dn(dn(os.path.realpath(__file__)))),
156 156 'rhodecode/lib/dbmigrate')
157 157 db_uri = self.dburi
158 158
159 159 try:
160 160 curr_version = api.db_version(db_uri, repository_path)
161 161 msg = ('Found current database under version '
162 162 'control with version %s' % curr_version)
163 163
164 164 except (RuntimeError, DatabaseNotControlledError):
165 165 curr_version = 1
166 166 msg = ('Current database is not under version control. Setting '
167 167 'as version %s' % curr_version)
168 168 api.version_control(db_uri, repository_path, curr_version)
169 169
170 170 notify(msg)
171 171
172 172 self.run_pre_migration_tasks()
173 173
174 174 if curr_version == __dbversion__:
175 175 log.info('This database is already at the newest version')
176 176 sys.exit(0)
177 177
178 178 upgrade_steps = range(curr_version + 1, __dbversion__ + 1)
179 179 notify('attempting to upgrade database from '
180 180 'version %s to version %s' % (curr_version, __dbversion__))
181 181
182 182 # CALL THE PROPER ORDER OF STEPS TO PERFORM FULL UPGRADE
183 183 _step = None
184 184 for step in upgrade_steps:
185 185 notify('performing upgrade step %s' % step)
186 186 time.sleep(0.5)
187 187
188 188 api.upgrade(db_uri, repository_path, step)
189 189 self.sa.rollback()
190 190 notify('schema upgrade for step %s completed' % (step,))
191 191
192 192 _step = step
193 193
194 194 notify('upgrade to version %s successful' % _step)
195 195
196 196 def fix_repo_paths(self):
197 197 """
198 198 Fixes an old RhodeCode version path into new one without a '*'
199 199 """
200 200
201 201 paths = self.sa.query(RhodeCodeUi)\
202 202 .filter(RhodeCodeUi.ui_key == '/')\
203 203 .scalar()
204 204
205 205 paths.ui_value = paths.ui_value.replace('*', '')
206 206
207 207 try:
208 208 self.sa.add(paths)
209 209 self.sa.commit()
210 210 except Exception:
211 211 self.sa.rollback()
212 212 raise
213 213
214 214 def fix_default_user(self):
215 215 """
216 216 Fixes an old default user with some 'nicer' default values,
217 217 used mostly for anonymous access
218 218 """
219 219 def_user = self.sa.query(User)\
220 220 .filter(User.username == User.DEFAULT_USER)\
221 221 .one()
222 222
223 223 def_user.name = 'Anonymous'
224 224 def_user.lastname = 'User'
225 225 def_user.email = User.DEFAULT_USER_EMAIL
226 226
227 227 try:
228 228 self.sa.add(def_user)
229 229 self.sa.commit()
230 230 except Exception:
231 231 self.sa.rollback()
232 232 raise
233 233
234 234 def fix_settings(self):
235 235 """
236 236 Fixes rhodecode settings and adds ga_code key for google analytics
237 237 """
238 238
239 239 hgsettings3 = RhodeCodeSetting('ga_code', '')
240 240
241 241 try:
242 242 self.sa.add(hgsettings3)
243 243 self.sa.commit()
244 244 except Exception:
245 245 self.sa.rollback()
246 246 raise
247 247
248 248 def create_admin_and_prompt(self):
249 249
250 250 # defaults
251 251 defaults = self.cli_args
252 252 username = defaults.get('username')
253 253 password = defaults.get('password')
254 254 email = defaults.get('email')
255 255
256 256 if username is None:
257 257 username = raw_input('Specify admin username:')
258 258 if password is None:
259 259 password = self._get_admin_password()
260 260 if not password:
261 261 # second try
262 262 password = self._get_admin_password()
263 263 if not password:
264 264 sys.exit()
265 265 if email is None:
266 266 email = raw_input('Specify admin email:')
267 267 api_key = self.cli_args.get('api_key')
268 268 self.create_user(username, password, email, True,
269 269 strict_creation_check=False,
270 270 api_key=api_key)
271 271
272 272 def _get_admin_password(self):
273 273 password = getpass.getpass('Specify admin password '
274 274 '(min 6 chars):')
275 275 confirm = getpass.getpass('Confirm password:')
276 276
277 277 if password != confirm:
278 278 log.error('passwords mismatch')
279 279 return False
280 280 if len(password) < 6:
281 281 log.error('password is too short - use at least 6 characters')
282 282 return False
283 283
284 284 return password
285 285
286 286 def create_test_admin_and_users(self):
287 287 log.info('creating admin and regular test users')
288 288 from rhodecode.tests import TEST_USER_ADMIN_LOGIN, \
289 289 TEST_USER_ADMIN_PASS, TEST_USER_ADMIN_EMAIL, \
290 290 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, \
291 291 TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR2_LOGIN, \
292 292 TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL
293 293
294 294 self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
295 295 TEST_USER_ADMIN_EMAIL, True, api_key=True)
296 296
297 297 self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
298 298 TEST_USER_REGULAR_EMAIL, False, api_key=True)
299 299
300 300 self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
301 301 TEST_USER_REGULAR2_EMAIL, False, api_key=True)
302 302
303 303 def create_ui_settings(self, repo_store_path):
304 304 """
305 305 Creates ui settings, fills out hooks
306 306 and disables dotencode
307 307 """
308 308 settings_model = SettingsModel(sa=self.sa)
309 309 from rhodecode.lib.vcs.backends.hg import largefiles_store
310 310 from rhodecode.lib.vcs.backends.git import lfs_store
311 311
312 312 # Build HOOKS
313 313 hooks = [
314 314 (RhodeCodeUi.HOOK_REPO_SIZE, 'python:vcsserver.hooks.repo_size'),
315 315
316 316 # HG
317 317 (RhodeCodeUi.HOOK_PRE_PULL, 'python:vcsserver.hooks.pre_pull'),
318 318 (RhodeCodeUi.HOOK_PULL, 'python:vcsserver.hooks.log_pull_action'),
319 319 (RhodeCodeUi.HOOK_PRE_PUSH, 'python:vcsserver.hooks.pre_push'),
320 320 (RhodeCodeUi.HOOK_PRETX_PUSH, 'python:vcsserver.hooks.pre_push'),
321 321 (RhodeCodeUi.HOOK_PUSH, 'python:vcsserver.hooks.log_push_action'),
322 322
323 323 ]
324 324
325 325 for key, value in hooks:
326 326 hook_obj = settings_model.get_ui_by_key(key)
327 327 hooks2 = hook_obj if hook_obj else RhodeCodeUi()
328 328 hooks2.ui_section = 'hooks'
329 329 hooks2.ui_key = key
330 330 hooks2.ui_value = value
331 331 self.sa.add(hooks2)
332 332
333 333 # enable largefiles
334 334 largefiles = RhodeCodeUi()
335 335 largefiles.ui_section = 'extensions'
336 336 largefiles.ui_key = 'largefiles'
337 337 largefiles.ui_value = ''
338 338 self.sa.add(largefiles)
339 339
340 340 # set default largefiles cache dir, defaults to
341 341 # /repo_store_location/.cache/largefiles
342 342 largefiles = RhodeCodeUi()
343 343 largefiles.ui_section = 'largefiles'
344 344 largefiles.ui_key = 'usercache'
345 345 largefiles.ui_value = largefiles_store(repo_store_path)
346 346
347 347 self.sa.add(largefiles)
348 348
349 349 # set default lfs cache dir, defaults to
350 350 # /repo_store_location/.cache/lfs_store
351 351 lfsstore = RhodeCodeUi()
352 352 lfsstore.ui_section = 'vcs_git_lfs'
353 353 lfsstore.ui_key = 'store_location'
354 354 lfsstore.ui_value = lfs_store(repo_store_path)
355 355
356 356 self.sa.add(lfsstore)
357 357
358 358 # enable hgsubversion disabled by default
359 359 hgsubversion = RhodeCodeUi()
360 360 hgsubversion.ui_section = 'extensions'
361 361 hgsubversion.ui_key = 'hgsubversion'
362 362 hgsubversion.ui_value = ''
363 363 hgsubversion.ui_active = False
364 364 self.sa.add(hgsubversion)
365 365
366 366 # enable hggit disabled by default
367 367 hggit = RhodeCodeUi()
368 368 hggit.ui_section = 'extensions'
369 369 hggit.ui_key = 'hggit'
370 370 hggit.ui_value = ''
371 371 hggit.ui_active = False
372 372 self.sa.add(hggit)
373 373
374 374 # set svn branch defaults
375 375 branches = ["/branches/*", "/trunk"]
376 376 tags = ["/tags/*"]
377 377
378 378 for branch in branches:
379 379 settings_model.create_ui_section_value(
380 380 RhodeCodeUi.SVN_BRANCH_ID, branch)
381 381
382 382 for tag in tags:
383 383 settings_model.create_ui_section_value(RhodeCodeUi.SVN_TAG_ID, tag)
384 384
385 385 def create_auth_plugin_options(self, skip_existing=False):
386 386 """
387 387 Create default auth plugin settings, and make it active
388 388
389 389 :param skip_existing:
390 390 """
391 391
392 392 for k, v, t in [('auth_plugins', 'egg:rhodecode-enterprise-ce#rhodecode', 'list'),
393 393 ('auth_rhodecode_enabled', 'True', 'bool')]:
394 394 if (skip_existing and
395 395 SettingsModel().get_setting_by_name(k) is not None):
396 396 log.debug('Skipping option %s' % k)
397 397 continue
398 398 setting = RhodeCodeSetting(k, v, t)
399 399 self.sa.add(setting)
400 400
401 401 def create_default_options(self, skip_existing=False):
402 402 """Creates default settings"""
403 403
404 404 for k, v, t in [
405 405 ('default_repo_enable_locking', False, 'bool'),
406 406 ('default_repo_enable_downloads', False, 'bool'),
407 407 ('default_repo_enable_statistics', False, 'bool'),
408 408 ('default_repo_private', False, 'bool'),
409 409 ('default_repo_type', 'hg', 'unicode')]:
410 410
411 411 if (skip_existing and
412 412 SettingsModel().get_setting_by_name(k) is not None):
413 413 log.debug('Skipping option %s' % k)
414 414 continue
415 415 setting = RhodeCodeSetting(k, v, t)
416 416 self.sa.add(setting)
417 417
418 418 def fixup_groups(self):
419 419 def_usr = User.get_default_user()
420 420 for g in RepoGroup.query().all():
421 421 g.group_name = g.get_new_name(g.name)
422 422 self.sa.add(g)
423 423 # get default perm
424 424 default = UserRepoGroupToPerm.query()\
425 425 .filter(UserRepoGroupToPerm.group == g)\
426 426 .filter(UserRepoGroupToPerm.user == def_usr)\
427 427 .scalar()
428 428
429 429 if default is None:
430 430 log.debug('missing default permission for group %s adding' % g)
431 431 perm_obj = RepoGroupModel()._create_default_perms(g)
432 432 self.sa.add(perm_obj)
433 433
434 434 def reset_permissions(self, username):
435 435 """
436 436 Resets permissions to default state, useful when old systems had
437 437 bad permissions, we must clean them up
438 438
439 439 :param username:
440 440 """
441 441 default_user = User.get_by_username(username)
442 442 if not default_user:
443 443 return
444 444
445 445 u2p = UserToPerm.query()\
446 446 .filter(UserToPerm.user == default_user).all()
447 447 fixed = False
448 448 if len(u2p) != len(Permission.DEFAULT_USER_PERMISSIONS):
449 449 for p in u2p:
450 450 Session().delete(p)
451 451 fixed = True
452 452 self.populate_default_permissions()
453 453 return fixed
454 454
455 455 def update_repo_info(self):
456 456 RepoModel.update_repoinfo()
457 457
458 458 def config_prompt(self, test_repo_path='', retries=3):
459 459 defaults = self.cli_args
460 460 _path = defaults.get('repos_location')
461 461 if retries == 3:
462 462 log.info('Setting up repositories config')
463 463
464 464 if _path is not None:
465 465 path = _path
466 466 elif not self.tests and not test_repo_path:
467 467 path = raw_input(
468 468 'Enter a valid absolute path to store repositories. '
469 469 'All repositories in that path will be added automatically:'
470 470 )
471 471 else:
472 472 path = test_repo_path
473 473 path_ok = True
474 474
475 475 # check proper dir
476 476 if not os.path.isdir(path):
477 477 path_ok = False
478 478 log.error('Given path %s is not a valid directory' % (path,))
479 479
480 480 elif not os.path.isabs(path):
481 481 path_ok = False
482 482 log.error('Given path %s is not an absolute path' % (path,))
483 483
484 484 # check if path is at least readable.
485 485 if not os.access(path, os.R_OK):
486 486 path_ok = False
487 487 log.error('Given path %s is not readable' % (path,))
488 488
489 489 # check write access, warn user about non writeable paths
490 490 elif not os.access(path, os.W_OK) and path_ok:
491 491 log.warning('No write permission to given path %s' % (path,))
492 492
493 493 q = ('Given path %s is not writeable, do you want to '
494 494 'continue with read only mode ? [y/n]' % (path,))
495 495 if not self.ask_ok(q):
496 496 log.error('Canceled by user')
497 497 sys.exit(-1)
498 498
499 499 if retries == 0:
500 500 sys.exit('max retries reached')
501 501 if not path_ok:
502 502 retries -= 1
503 503 return self.config_prompt(test_repo_path, retries)
504 504
505 505 real_path = os.path.normpath(os.path.realpath(path))
506 506
507 507 if real_path != os.path.normpath(path):
508 508 q = ('Path looks like a symlink, RhodeCode Enterprise will store '
509 509 'given path as %s ? [y/n]') % (real_path,)
510 510 if not self.ask_ok(q):
511 511 log.error('Canceled by user')
512 512 sys.exit(-1)
513 513
514 514 return real_path
515 515
516 516 def create_settings(self, path):
517 517
518 518 self.create_ui_settings(path)
519 519
520 520 ui_config = [
521 ('web', 'push_ssl', 'false'),
521 ('web', 'push_ssl', 'False'),
522 522 ('web', 'allow_archive', 'gz zip bz2'),
523 523 ('web', 'allow_push', '*'),
524 524 ('web', 'baseurl', '/'),
525 525 ('paths', '/', path),
526 ('phases', 'publish', 'true')
526 ('phases', 'publish', 'True')
527 527 ]
528 528 for section, key, value in ui_config:
529 529 ui_conf = RhodeCodeUi()
530 530 setattr(ui_conf, 'ui_section', section)
531 531 setattr(ui_conf, 'ui_key', key)
532 532 setattr(ui_conf, 'ui_value', value)
533 533 self.sa.add(ui_conf)
534 534
535 535 # rhodecode app settings
536 536 settings = [
537 537 ('realm', 'RhodeCode', 'unicode'),
538 538 ('title', '', 'unicode'),
539 539 ('pre_code', '', 'unicode'),
540 540 ('post_code', '', 'unicode'),
541 541 ('show_public_icon', True, 'bool'),
542 542 ('show_private_icon', True, 'bool'),
543 543 ('stylify_metatags', False, 'bool'),
544 544 ('dashboard_items', 100, 'int'),
545 545 ('admin_grid_items', 25, 'int'),
546 546 ('show_version', True, 'bool'),
547 547 ('use_gravatar', False, 'bool'),
548 548 ('gravatar_url', User.DEFAULT_GRAVATAR_URL, 'unicode'),
549 549 ('clone_uri_tmpl', Repository.DEFAULT_CLONE_URI, 'unicode'),
550 550 ('support_url', '', 'unicode'),
551 551 ('update_url', RhodeCodeSetting.DEFAULT_UPDATE_URL, 'unicode'),
552 552 ('show_revision_number', True, 'bool'),
553 553 ('show_sha_length', 12, 'int'),
554 554 ]
555 555
556 556 for key, val, type_ in settings:
557 557 sett = RhodeCodeSetting(key, val, type_)
558 558 self.sa.add(sett)
559 559
560 560 self.create_auth_plugin_options()
561 561 self.create_default_options()
562 562
563 563 log.info('created ui config')
564 564
565 565 def create_user(self, username, password, email='', admin=False,
566 566 strict_creation_check=True, api_key=None):
567 567 log.info('creating user %s' % username)
568 568 user = UserModel().create_or_update(
569 569 username, password, email, firstname='RhodeCode', lastname='Admin',
570 570 active=True, admin=admin, extern_type="rhodecode",
571 571 strict_creation_check=strict_creation_check)
572 572
573 573 if api_key:
574 574 log.info('setting a provided api key for the user %s', username)
575 575 from rhodecode.model.auth_token import AuthTokenModel
576 576 AuthTokenModel().create(
577 577 user=user, description='BUILTIN TOKEN')
578 578
579 579 def create_default_user(self):
580 580 log.info('creating default user')
581 581 # create default user for handling default permissions.
582 582 user = UserModel().create_or_update(username=User.DEFAULT_USER,
583 583 password=str(uuid.uuid1())[:20],
584 584 email=User.DEFAULT_USER_EMAIL,
585 585 firstname='Anonymous',
586 586 lastname='User',
587 587 strict_creation_check=False)
588 588 # based on configuration options activate/deactive this user which
589 589 # controlls anonymous access
590 590 if self.cli_args.get('public_access') is False:
591 591 log.info('Public access disabled')
592 592 user.active = False
593 593 Session().add(user)
594 594 Session().commit()
595 595
596 596 def create_permissions(self):
597 597 """
598 598 Creates all permissions defined in the system
599 599 """
600 600 # module.(access|create|change|delete)_[name]
601 601 # module.(none|read|write|admin)
602 602 log.info('creating permissions')
603 603 PermissionModel(self.sa).create_permissions()
604 604
605 605 def populate_default_permissions(self):
606 606 """
607 607 Populate default permissions. It will create only the default
608 608 permissions that are missing, and not alter already defined ones
609 609 """
610 610 log.info('creating default user permissions')
611 611 PermissionModel(self.sa).create_default_user_permissions(user=User.DEFAULT_USER)
@@ -1,1250 +1,1250 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urllib
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.lib import auth
27 27 from rhodecode.lib.utils2 import safe_str, str2bool
28 28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
29 29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
30 30 Permission
31 31 from rhodecode.model.meta import Session
32 32 from rhodecode.model.repo import RepoModel
33 33 from rhodecode.model.repo_group import RepoGroupModel
34 34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (
37 37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
39 39 logout_user_session)
40 40 from rhodecode.tests.fixture import Fixture, error_function
41 41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
42 42
43 43 fixture = Fixture()
44 44
45 45
46 46 @pytest.mark.usefixtures("app")
47 47 class TestAdminRepos:
48 48
49 49 def test_index(self):
50 50 self.app.get(url('repos'))
51 51
52 52 def test_create_page_restricted(self, autologin_user, backend):
53 53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
54 54 response = self.app.get(url('new_repo'), status=200)
55 55 assert_response = AssertResponse(response)
56 56 element = assert_response.get_element('#repo_type')
57 57 assert element.text_content() == '\ngit\n'
58 58
59 59 def test_create_page_non_restricted(self, autologin_user, backend):
60 60 response = self.app.get(url('new_repo'), status=200)
61 61 assert_response = AssertResponse(response)
62 62 assert_response.element_contains('#repo_type', 'git')
63 63 assert_response.element_contains('#repo_type', 'svn')
64 64 assert_response.element_contains('#repo_type', 'hg')
65 65
66 66 @pytest.mark.parametrize("suffix", [u'', u''], ids=['', 'non-ascii'])
67 67 def test_create(self, autologin_user, backend, suffix, csrf_token):
68 68 repo_name_unicode = backend.new_repo_name(suffix=suffix)
69 69 repo_name = repo_name_unicode.encode('utf8')
70 70 description_unicode = u'description for newly created repo' + suffix
71 71 description = description_unicode.encode('utf8')
72 72 self.app.post(
73 73 url('repos'),
74 74 fixture._get_repo_create_params(
75 75 repo_private=False,
76 76 repo_name=repo_name,
77 77 repo_type=backend.alias,
78 78 repo_description=description,
79 79 csrf_token=csrf_token),
80 80 status=302
81 81 )
82 82
83 83 self.assert_repository_is_created_correctly(
84 84 repo_name, description, backend)
85 85
86 86 def test_create_numeric(self, autologin_user, backend, csrf_token):
87 87 numeric_repo = '1234'
88 88 repo_name = numeric_repo
89 89 description = 'description for newly created repo' + numeric_repo
90 90 self.app.post(
91 91 url('repos'),
92 92 fixture._get_repo_create_params(
93 93 repo_private=False,
94 94 repo_name=repo_name,
95 95 repo_type=backend.alias,
96 96 repo_description=description,
97 97 csrf_token=csrf_token))
98 98
99 99 self.assert_repository_is_created_correctly(
100 100 repo_name, description, backend)
101 101
102 102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
103 103 def test_create_in_group(
104 104 self, autologin_user, backend, suffix, csrf_token):
105 105 # create GROUP
106 106 group_name = 'sometest_%s' % backend.alias
107 107 gr = RepoGroupModel().create(group_name=group_name,
108 108 group_description='test',
109 109 owner=TEST_USER_ADMIN_LOGIN)
110 110 Session().commit()
111 111
112 112 repo_name = u'ingroup' + suffix
113 113 repo_name_full = RepoGroup.url_sep().join(
114 114 [group_name, repo_name])
115 115 description = u'description for newly created repo'
116 116 self.app.post(
117 117 url('repos'),
118 118 fixture._get_repo_create_params(
119 119 repo_private=False,
120 120 repo_name=safe_str(repo_name),
121 121 repo_type=backend.alias,
122 122 repo_description=description,
123 123 repo_group=gr.group_id,
124 124 csrf_token=csrf_token))
125 125
126 126 # TODO: johbo: Cleanup work to fixture
127 127 try:
128 128 self.assert_repository_is_created_correctly(
129 129 repo_name_full, description, backend)
130 130
131 131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
132 132 inherited_perms = UserRepoToPerm.query().filter(
133 133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
134 134 assert len(inherited_perms) == 1
135 135 finally:
136 136 RepoModel().delete(repo_name_full)
137 137 RepoGroupModel().delete(group_name)
138 138 Session().commit()
139 139
140 140 def test_create_in_group_numeric(
141 141 self, autologin_user, backend, csrf_token):
142 142 # create GROUP
143 143 group_name = 'sometest_%s' % backend.alias
144 144 gr = RepoGroupModel().create(group_name=group_name,
145 145 group_description='test',
146 146 owner=TEST_USER_ADMIN_LOGIN)
147 147 Session().commit()
148 148
149 149 repo_name = '12345'
150 150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
151 151 description = 'description for newly created repo'
152 152 self.app.post(
153 153 url('repos'),
154 154 fixture._get_repo_create_params(
155 155 repo_private=False,
156 156 repo_name=repo_name,
157 157 repo_type=backend.alias,
158 158 repo_description=description,
159 159 repo_group=gr.group_id,
160 160 csrf_token=csrf_token))
161 161
162 162 # TODO: johbo: Cleanup work to fixture
163 163 try:
164 164 self.assert_repository_is_created_correctly(
165 165 repo_name_full, description, backend)
166 166
167 167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
168 168 inherited_perms = UserRepoToPerm.query()\
169 169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
170 170 assert len(inherited_perms) == 1
171 171 finally:
172 172 RepoModel().delete(repo_name_full)
173 173 RepoGroupModel().delete(group_name)
174 174 Session().commit()
175 175
176 176 def test_create_in_group_without_needed_permissions(self, backend):
177 177 session = login_user_session(
178 178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
179 179 csrf_token = auth.get_csrf_token(session)
180 180 # revoke
181 181 user_model = UserModel()
182 182 # disable fork and create on default user
183 183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
184 184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
185 185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
186 186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
187 187
188 188 # disable on regular user
189 189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
190 190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
191 191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
192 192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
193 193 Session().commit()
194 194
195 195 # create GROUP
196 196 group_name = 'reg_sometest_%s' % backend.alias
197 197 gr = RepoGroupModel().create(group_name=group_name,
198 198 group_description='test',
199 199 owner=TEST_USER_ADMIN_LOGIN)
200 200 Session().commit()
201 201
202 202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
203 203 gr_allowed = RepoGroupModel().create(
204 204 group_name=group_name_allowed,
205 205 group_description='test',
206 206 owner=TEST_USER_REGULAR_LOGIN)
207 207 Session().commit()
208 208
209 209 repo_name = 'ingroup'
210 210 description = 'description for newly created repo'
211 211 response = self.app.post(
212 212 url('repos'),
213 213 fixture._get_repo_create_params(
214 214 repo_private=False,
215 215 repo_name=repo_name,
216 216 repo_type=backend.alias,
217 217 repo_description=description,
218 218 repo_group=gr.group_id,
219 219 csrf_token=csrf_token))
220 220
221 221 response.mustcontain('Invalid value')
222 222
223 223 # user is allowed to create in this group
224 224 repo_name = 'ingroup'
225 225 repo_name_full = RepoGroup.url_sep().join(
226 226 [group_name_allowed, repo_name])
227 227 description = 'description for newly created repo'
228 228 response = self.app.post(
229 229 url('repos'),
230 230 fixture._get_repo_create_params(
231 231 repo_private=False,
232 232 repo_name=repo_name,
233 233 repo_type=backend.alias,
234 234 repo_description=description,
235 235 repo_group=gr_allowed.group_id,
236 236 csrf_token=csrf_token))
237 237
238 238 # TODO: johbo: Cleanup in pytest fixture
239 239 try:
240 240 self.assert_repository_is_created_correctly(
241 241 repo_name_full, description, backend)
242 242
243 243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
244 244 inherited_perms = UserRepoToPerm.query().filter(
245 245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
246 246 assert len(inherited_perms) == 1
247 247
248 248 assert repo_on_filesystem(repo_name_full)
249 249 finally:
250 250 RepoModel().delete(repo_name_full)
251 251 RepoGroupModel().delete(group_name)
252 252 RepoGroupModel().delete(group_name_allowed)
253 253 Session().commit()
254 254
255 255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
256 256 csrf_token):
257 257 # create GROUP
258 258 group_name = 'sometest_%s' % backend.alias
259 259 gr = RepoGroupModel().create(group_name=group_name,
260 260 group_description='test',
261 261 owner=TEST_USER_ADMIN_LOGIN)
262 262 perm = Permission.get_by_key('repository.write')
263 263 RepoGroupModel().grant_user_permission(
264 264 gr, TEST_USER_REGULAR_LOGIN, perm)
265 265
266 266 # add repo permissions
267 267 Session().commit()
268 268
269 269 repo_name = 'ingroup_inherited_%s' % backend.alias
270 270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
271 271 description = 'description for newly created repo'
272 272 self.app.post(
273 273 url('repos'),
274 274 fixture._get_repo_create_params(
275 275 repo_private=False,
276 276 repo_name=repo_name,
277 277 repo_type=backend.alias,
278 278 repo_description=description,
279 279 repo_group=gr.group_id,
280 280 repo_copy_permissions=True,
281 281 csrf_token=csrf_token))
282 282
283 283 # TODO: johbo: Cleanup to pytest fixture
284 284 try:
285 285 self.assert_repository_is_created_correctly(
286 286 repo_name_full, description, backend)
287 287 except Exception:
288 288 RepoGroupModel().delete(group_name)
289 289 Session().commit()
290 290 raise
291 291
292 292 # check if inherited permissions are applied
293 293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
294 294 inherited_perms = UserRepoToPerm.query().filter(
295 295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
296 296 assert len(inherited_perms) == 2
297 297
298 298 assert TEST_USER_REGULAR_LOGIN in [
299 299 x.user.username for x in inherited_perms]
300 300 assert 'repository.write' in [
301 301 x.permission.permission_name for x in inherited_perms]
302 302
303 303 RepoModel().delete(repo_name_full)
304 304 RepoGroupModel().delete(group_name)
305 305 Session().commit()
306 306
307 307 @pytest.mark.xfail_backends(
308 308 "git", "hg", reason="Missing reposerver support")
309 309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
310 310 csrf_token):
311 311 source_repo = backend.create_repo(number_of_commits=2)
312 312 source_repo_name = source_repo.repo_name
313 313 reposerver.serve(source_repo.scm_instance())
314 314
315 315 repo_name = backend.new_repo_name()
316 316 response = self.app.post(
317 317 url('repos'),
318 318 fixture._get_repo_create_params(
319 319 repo_private=False,
320 320 repo_name=repo_name,
321 321 repo_type=backend.alias,
322 322 repo_description='',
323 323 clone_uri=reposerver.url,
324 324 csrf_token=csrf_token),
325 325 status=302)
326 326
327 327 # Should be redirected to the creating page
328 328 response.mustcontain('repo_creating')
329 329
330 330 # Expecting that both repositories have same history
331 331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
332 332 source_vcs = source_repo.scm_instance()
333 333 repo = RepoModel().get_by_repo_name(repo_name)
334 334 repo_vcs = repo.scm_instance()
335 335 assert source_vcs[0].message == repo_vcs[0].message
336 336 assert source_vcs.count() == repo_vcs.count()
337 337 assert source_vcs.commit_ids == repo_vcs.commit_ids
338 338
339 339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
340 340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
341 341 csrf_token):
342 342 repo_name = backend.new_repo_name()
343 343 description = 'description for newly created repo'
344 344 response = self.app.post(
345 345 url('repos'),
346 346 fixture._get_repo_create_params(
347 347 repo_private=False,
348 348 repo_name=repo_name,
349 349 repo_type=backend.alias,
350 350 repo_description=description,
351 351 clone_uri='http://repo.invalid/repo',
352 352 csrf_token=csrf_token))
353 353 response.mustcontain('invalid clone url')
354 354
355 355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
356 356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
357 357 self, autologin_user, backend, csrf_token):
358 358 repo_name = backend.new_repo_name()
359 359 description = 'description for newly created repo'
360 360 response = self.app.post(
361 361 url('repos'),
362 362 fixture._get_repo_create_params(
363 363 repo_private=False,
364 364 repo_name=repo_name,
365 365 repo_type=backend.alias,
366 366 repo_description=description,
367 367 clone_uri='svn+http://svn.invalid/repo',
368 368 csrf_token=csrf_token))
369 369 response.mustcontain('invalid clone url')
370 370
371 371 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
372 372 def test_delete(self, autologin_user, backend, suffix, csrf_token):
373 373 repo = backend.create_repo(name_suffix=suffix)
374 374 repo_name = repo.repo_name
375 375
376 376 response = self.app.post(url('repo', repo_name=repo_name),
377 377 params={'_method': 'delete',
378 378 'csrf_token': csrf_token})
379 379 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
380 380 response.follow()
381 381
382 382 # check if repo was deleted from db
383 383 assert RepoModel().get_by_repo_name(repo_name) is None
384 384 assert not repo_on_filesystem(repo_name)
385 385
386 386 def test_show(self, autologin_user, backend):
387 387 self.app.get(url('repo', repo_name=backend.repo_name))
388 388
389 389 def test_edit(self, backend, autologin_user):
390 390 self.app.get(url('edit_repo', repo_name=backend.repo_name))
391 391
392 392 def test_edit_accessible_when_missing_requirements(
393 393 self, backend_hg, autologin_user):
394 394 scm_patcher = mock.patch.object(
395 395 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
396 396 with scm_patcher:
397 397 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
398 398
399 399 def test_set_private_flag_sets_default_to_none(
400 400 self, autologin_user, backend, csrf_token):
401 401 # initially repository perm should be read
402 402 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
403 403 assert len(perm) == 1
404 404 assert perm[0].permission.permission_name == 'repository.read'
405 405 assert not backend.repo.private
406 406
407 407 response = self.app.post(
408 408 url('repo', repo_name=backend.repo_name),
409 409 fixture._get_repo_create_params(
410 410 repo_private=1,
411 411 repo_name=backend.repo_name,
412 412 repo_type=backend.alias,
413 413 user=TEST_USER_ADMIN_LOGIN,
414 414 _method='put',
415 415 csrf_token=csrf_token))
416 416 assert_session_flash(
417 417 response,
418 418 msg='Repository %s updated successfully' % (backend.repo_name))
419 419 assert backend.repo.private
420 420
421 421 # now the repo default permission should be None
422 422 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
423 423 assert len(perm) == 1
424 424 assert perm[0].permission.permission_name == 'repository.none'
425 425
426 426 response = self.app.post(
427 427 url('repo', repo_name=backend.repo_name),
428 428 fixture._get_repo_create_params(
429 429 repo_private=False,
430 430 repo_name=backend.repo_name,
431 431 repo_type=backend.alias,
432 432 user=TEST_USER_ADMIN_LOGIN,
433 433 _method='put',
434 434 csrf_token=csrf_token))
435 435 assert_session_flash(
436 436 response,
437 437 msg='Repository %s updated successfully' % (backend.repo_name))
438 438 assert not backend.repo.private
439 439
440 440 # we turn off private now the repo default permission should stay None
441 441 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
442 442 assert len(perm) == 1
443 443 assert perm[0].permission.permission_name == 'repository.none'
444 444
445 445 # update this permission back
446 446 perm[0].permission = Permission.get_by_key('repository.read')
447 447 Session().add(perm[0])
448 448 Session().commit()
449 449
450 450 def test_default_user_cannot_access_private_repo_in_a_group(
451 451 self, autologin_user, user_util, backend, csrf_token):
452 452
453 453 group = user_util.create_repo_group()
454 454
455 455 repo = backend.create_repo(
456 456 repo_private=True, repo_group=group, repo_copy_permissions=True)
457 457
458 458 permissions = _get_permission_for_user(
459 459 user='default', repo=repo.repo_name)
460 460 assert len(permissions) == 1
461 461 assert permissions[0].permission.permission_name == 'repository.none'
462 462 assert permissions[0].repository.private is True
463 463
464 464 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
465 465 repo = backend.repo
466 466 response = self.app.get(
467 467 url('edit_repo_advanced', repo_name=backend.repo_name))
468 468 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
469 469 response.mustcontain(no=[opt])
470 470
471 471 def test_set_fork_of_target_repo(
472 472 self, autologin_user, backend, csrf_token):
473 473 target_repo = 'target_%s' % backend.alias
474 474 fixture.create_repo(target_repo, repo_type=backend.alias)
475 475 repo2 = Repository.get_by_repo_name(target_repo)
476 476 response = self.app.post(
477 477 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
478 478 params={'id_fork_of': repo2.repo_id, '_method': 'put',
479 479 'csrf_token': csrf_token})
480 480 repo = Repository.get_by_repo_name(backend.repo_name)
481 481 repo2 = Repository.get_by_repo_name(target_repo)
482 482 assert_session_flash(
483 483 response,
484 484 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
485 485
486 486 assert repo.fork == repo2
487 487 response = response.follow()
488 488 # check if given repo is selected
489 489
490 490 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
491 491 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
492 492
493 493 response.mustcontain(opt)
494 494
495 495 fixture.destroy_repo(target_repo, forks='detach')
496 496
497 497 @pytest.mark.backends("hg", "git")
498 498 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
499 499 csrf_token):
500 500 TARGET_REPO_MAP = {
501 501 'git': {
502 502 'type': 'hg',
503 503 'repo_name': HG_REPO},
504 504 'hg': {
505 505 'type': 'git',
506 506 'repo_name': GIT_REPO},
507 507 }
508 508 target_repo = TARGET_REPO_MAP[backend.alias]
509 509
510 510 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
511 511 response = self.app.post(
512 512 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
513 513 params={'id_fork_of': repo2.repo_id, '_method': 'put',
514 514 'csrf_token': csrf_token})
515 515 assert_session_flash(
516 516 response,
517 517 'Cannot set repository as fork of repository with other type')
518 518
519 519 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
520 520 # mark it as None
521 521 response = self.app.post(
522 522 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
523 523 params={'id_fork_of': None, '_method': 'put',
524 524 'csrf_token': csrf_token})
525 525 assert_session_flash(
526 526 response,
527 527 'Marked repo %s as fork of %s'
528 528 % (backend.repo_name, "Nothing"))
529 529 assert backend.repo.fork is None
530 530
531 531 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
532 532 repo = Repository.get_by_repo_name(backend.repo_name)
533 533 response = self.app.post(
534 534 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
535 535 params={'id_fork_of': repo.repo_id, '_method': 'put',
536 536 'csrf_token': csrf_token})
537 537 assert_session_flash(
538 538 response, 'An error occurred during this operation')
539 539
540 540 def test_create_on_top_level_without_permissions(self, backend):
541 541 session = login_user_session(
542 542 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
543 543 csrf_token = auth.get_csrf_token(session)
544 544
545 545 # revoke
546 546 user_model = UserModel()
547 547 # disable fork and create on default user
548 548 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
549 549 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
550 550 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
551 551 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
552 552
553 553 # disable on regular user
554 554 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
555 555 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
556 556 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
557 557 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
558 558 Session().commit()
559 559
560 560 repo_name = backend.new_repo_name()
561 561 description = 'description for newly created repo'
562 562 response = self.app.post(
563 563 url('repos'),
564 564 fixture._get_repo_create_params(
565 565 repo_private=False,
566 566 repo_name=repo_name,
567 567 repo_type=backend.alias,
568 568 repo_description=description,
569 569 csrf_token=csrf_token))
570 570
571 571 response.mustcontain(
572 572 u"You do not have the permission to store repositories in "
573 573 u"the root location.")
574 574
575 575 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
576 576 def test_create_repo_when_filesystem_op_fails(
577 577 self, autologin_user, backend, csrf_token):
578 578 repo_name = backend.new_repo_name()
579 579 description = 'description for newly created repo'
580 580
581 581 response = self.app.post(
582 582 url('repos'),
583 583 fixture._get_repo_create_params(
584 584 repo_private=False,
585 585 repo_name=repo_name,
586 586 repo_type=backend.alias,
587 587 repo_description=description,
588 588 csrf_token=csrf_token))
589 589
590 590 assert_session_flash(
591 591 response, 'Error creating repository %s' % repo_name)
592 592 # repo must not be in db
593 593 assert backend.repo is None
594 594 # repo must not be in filesystem !
595 595 assert not repo_on_filesystem(repo_name)
596 596
597 597 def assert_repository_is_created_correctly(
598 598 self, repo_name, description, backend):
599 599 repo_name_utf8 = repo_name.encode('utf-8')
600 600
601 601 # run the check page that triggers the flash message
602 602 response = self.app.get(url('repo_check_home', repo_name=repo_name))
603 603 assert response.json == {u'result': True}
604 604 assert_session_flash(
605 605 response,
606 606 u'Created repository <a href="/%s">%s</a>'
607 607 % (urllib.quote(repo_name_utf8), repo_name))
608 608
609 609 # test if the repo was created in the database
610 610 new_repo = RepoModel().get_by_repo_name(repo_name)
611 611
612 612 assert new_repo.repo_name == repo_name
613 613 assert new_repo.description == description
614 614
615 615 # test if the repository is visible in the list ?
616 616 response = self.app.get(url('summary_home', repo_name=repo_name))
617 617 response.mustcontain(repo_name)
618 618 response.mustcontain(backend.alias)
619 619
620 620 assert repo_on_filesystem(repo_name)
621 621
622 622
623 623 @pytest.mark.usefixtures("app")
624 624 class TestVcsSettings(object):
625 625 FORM_DATA = {
626 626 'inherit_global_settings': False,
627 627 'hooks_changegroup_repo_size': False,
628 628 'hooks_changegroup_push_logger': False,
629 629 'hooks_outgoing_pull_logger': False,
630 630 'extensions_largefiles': False,
631 'phases_publish': 'false',
631 'phases_publish': 'False',
632 632 'rhodecode_pr_merge_enabled': False,
633 633 'rhodecode_use_outdated_comments': False,
634 634 'new_svn_branch': '',
635 635 'new_svn_tag': ''
636 636 }
637 637
638 638 @pytest.mark.skip_backends('svn')
639 639 def test_global_settings_initial_values(self, autologin_user, backend):
640 640 repo_name = backend.repo_name
641 641 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
642 642
643 643 expected_settings = (
644 644 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
645 645 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
646 646 'hooks_outgoing_pull_logger'
647 647 )
648 648 for setting in expected_settings:
649 649 self.assert_repo_value_equals_global_value(response, setting)
650 650
651 651 def test_show_settings_requires_repo_admin_permission(
652 652 self, backend, user_util, settings_util):
653 653 repo = backend.create_repo()
654 654 repo_name = repo.repo_name
655 655 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
656 656 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
657 657 login_user_session(
658 658 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
659 659 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
660 660
661 661 def test_inherit_global_settings_flag_is_true_by_default(
662 662 self, autologin_user, backend):
663 663 repo_name = backend.repo_name
664 664 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
665 665
666 666 assert_response = AssertResponse(response)
667 667 element = assert_response.get_element('#inherit_global_settings')
668 668 assert element.checked
669 669
670 670 @pytest.mark.parametrize('checked_value', [True, False])
671 671 def test_inherit_global_settings_value(
672 672 self, autologin_user, backend, checked_value, settings_util):
673 673 repo = backend.create_repo()
674 674 repo_name = repo.repo_name
675 675 settings_util.create_repo_rhodecode_setting(
676 676 repo, 'inherit_vcs_settings', checked_value, 'bool')
677 677 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
678 678
679 679 assert_response = AssertResponse(response)
680 680 element = assert_response.get_element('#inherit_global_settings')
681 681 assert element.checked == checked_value
682 682
683 683 @pytest.mark.skip_backends('svn')
684 684 def test_hooks_settings_are_created(
685 685 self, autologin_user, backend, csrf_token):
686 686 repo_name = backend.repo_name
687 687 data = self.FORM_DATA.copy()
688 688 data['csrf_token'] = csrf_token
689 689 self.app.post(
690 690 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
691 691 settings = SettingsModel(repo=repo_name)
692 692 try:
693 693 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
694 694 ui = settings.get_ui_by_section_and_key(section, key)
695 695 assert ui.ui_active is False
696 696 finally:
697 697 self._cleanup_repo_settings(settings)
698 698
699 699 def test_hooks_settings_are_not_created_for_svn(
700 700 self, autologin_user, backend_svn, csrf_token):
701 701 repo_name = backend_svn.repo_name
702 702 data = self.FORM_DATA.copy()
703 703 data['csrf_token'] = csrf_token
704 704 self.app.post(
705 705 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
706 706 settings = SettingsModel(repo=repo_name)
707 707 try:
708 708 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
709 709 ui = settings.get_ui_by_section_and_key(section, key)
710 710 assert ui is None
711 711 finally:
712 712 self._cleanup_repo_settings(settings)
713 713
714 714 @pytest.mark.skip_backends('svn')
715 715 def test_hooks_settings_are_updated(
716 716 self, autologin_user, backend, csrf_token):
717 717 repo_name = backend.repo_name
718 718 settings = SettingsModel(repo=repo_name)
719 719 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
720 720 settings.create_ui_section_value(section, '', key=key, active=True)
721 721
722 722 data = self.FORM_DATA.copy()
723 723 data['csrf_token'] = csrf_token
724 724 self.app.post(
725 725 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
726 726 try:
727 727 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
728 728 ui = settings.get_ui_by_section_and_key(section, key)
729 729 assert ui.ui_active is False
730 730 finally:
731 731 self._cleanup_repo_settings(settings)
732 732
733 733 def test_hooks_settings_are_not_updated_for_svn(
734 734 self, autologin_user, backend_svn, csrf_token):
735 735 repo_name = backend_svn.repo_name
736 736 settings = SettingsModel(repo=repo_name)
737 737 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
738 738 settings.create_ui_section_value(section, '', key=key, active=True)
739 739
740 740 data = self.FORM_DATA.copy()
741 741 data['csrf_token'] = csrf_token
742 742 self.app.post(
743 743 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
744 744 try:
745 745 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
746 746 ui = settings.get_ui_by_section_and_key(section, key)
747 747 assert ui.ui_active is True
748 748 finally:
749 749 self._cleanup_repo_settings(settings)
750 750
751 751 @pytest.mark.skip_backends('svn')
752 752 def test_pr_settings_are_created(
753 753 self, autologin_user, backend, csrf_token):
754 754 repo_name = backend.repo_name
755 755 data = self.FORM_DATA.copy()
756 756 data['csrf_token'] = csrf_token
757 757 self.app.post(
758 758 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
759 759 settings = SettingsModel(repo=repo_name)
760 760 try:
761 761 for name in VcsSettingsModel.GENERAL_SETTINGS:
762 762 setting = settings.get_setting_by_name(name)
763 763 assert setting.app_settings_value is False
764 764 finally:
765 765 self._cleanup_repo_settings(settings)
766 766
767 767 def test_pr_settings_are_not_created_for_svn(
768 768 self, autologin_user, backend_svn, csrf_token):
769 769 repo_name = backend_svn.repo_name
770 770 data = self.FORM_DATA.copy()
771 771 data['csrf_token'] = csrf_token
772 772 self.app.post(
773 773 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
774 774 settings = SettingsModel(repo=repo_name)
775 775 try:
776 776 for name in VcsSettingsModel.GENERAL_SETTINGS:
777 777 setting = settings.get_setting_by_name(name)
778 778 assert setting is None
779 779 finally:
780 780 self._cleanup_repo_settings(settings)
781 781
782 782 def test_pr_settings_creation_requires_repo_admin_permission(
783 783 self, backend, user_util, settings_util, csrf_token):
784 784 repo = backend.create_repo()
785 785 repo_name = repo.repo_name
786 786 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
787 787
788 788 logout_user_session(self.app, csrf_token)
789 789 session = login_user_session(
790 790 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
791 791 new_csrf_token = auth.get_csrf_token(session)
792 792
793 793 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
794 794 data = self.FORM_DATA.copy()
795 795 data['csrf_token'] = new_csrf_token
796 796 settings = SettingsModel(repo=repo_name)
797 797
798 798 try:
799 799 self.app.post(
800 800 url('repo_vcs_settings', repo_name=repo_name), data,
801 801 status=302)
802 802 finally:
803 803 self._cleanup_repo_settings(settings)
804 804
805 805 @pytest.mark.skip_backends('svn')
806 806 def test_pr_settings_are_updated(
807 807 self, autologin_user, backend, csrf_token):
808 808 repo_name = backend.repo_name
809 809 settings = SettingsModel(repo=repo_name)
810 810 for name in VcsSettingsModel.GENERAL_SETTINGS:
811 811 settings.create_or_update_setting(name, True, 'bool')
812 812
813 813 data = self.FORM_DATA.copy()
814 814 data['csrf_token'] = csrf_token
815 815 self.app.post(
816 816 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
817 817 try:
818 818 for name in VcsSettingsModel.GENERAL_SETTINGS:
819 819 setting = settings.get_setting_by_name(name)
820 820 assert setting.app_settings_value is False
821 821 finally:
822 822 self._cleanup_repo_settings(settings)
823 823
824 824 def test_pr_settings_are_not_updated_for_svn(
825 825 self, autologin_user, backend_svn, csrf_token):
826 826 repo_name = backend_svn.repo_name
827 827 settings = SettingsModel(repo=repo_name)
828 828 for name in VcsSettingsModel.GENERAL_SETTINGS:
829 829 settings.create_or_update_setting(name, True, 'bool')
830 830
831 831 data = self.FORM_DATA.copy()
832 832 data['csrf_token'] = csrf_token
833 833 self.app.post(
834 834 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
835 835 try:
836 836 for name in VcsSettingsModel.GENERAL_SETTINGS:
837 837 setting = settings.get_setting_by_name(name)
838 838 assert setting.app_settings_value is True
839 839 finally:
840 840 self._cleanup_repo_settings(settings)
841 841
842 842 def test_svn_settings_are_created(
843 843 self, autologin_user, backend_svn, csrf_token, settings_util):
844 844 repo_name = backend_svn.repo_name
845 845 data = self.FORM_DATA.copy()
846 846 data['new_svn_tag'] = 'svn-tag'
847 847 data['new_svn_branch'] = 'svn-branch'
848 848 data['csrf_token'] = csrf_token
849 849
850 850 # Create few global settings to make sure that uniqueness validators
851 851 # are not triggered
852 852 settings_util.create_rhodecode_ui(
853 853 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
854 854 settings_util.create_rhodecode_ui(
855 855 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
856 856
857 857 self.app.post(
858 858 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
859 859 settings = SettingsModel(repo=repo_name)
860 860 try:
861 861 svn_branches = settings.get_ui_by_section(
862 862 VcsSettingsModel.SVN_BRANCH_SECTION)
863 863 svn_branch_names = [b.ui_value for b in svn_branches]
864 864 svn_tags = settings.get_ui_by_section(
865 865 VcsSettingsModel.SVN_TAG_SECTION)
866 866 svn_tag_names = [b.ui_value for b in svn_tags]
867 867 assert 'svn-branch' in svn_branch_names
868 868 assert 'svn-tag' in svn_tag_names
869 869 finally:
870 870 self._cleanup_repo_settings(settings)
871 871
872 872 def test_svn_settings_are_unique(
873 873 self, autologin_user, backend_svn, csrf_token, settings_util):
874 874 repo = backend_svn.repo
875 875 repo_name = repo.repo_name
876 876 data = self.FORM_DATA.copy()
877 877 data['new_svn_tag'] = 'test_tag'
878 878 data['new_svn_branch'] = 'test_branch'
879 879 data['csrf_token'] = csrf_token
880 880 settings_util.create_repo_rhodecode_ui(
881 881 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
882 882 settings_util.create_repo_rhodecode_ui(
883 883 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
884 884
885 885 response = self.app.post(
886 886 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
887 887 response.mustcontain('Pattern already exists')
888 888
889 889 def test_svn_settings_with_empty_values_are_not_created(
890 890 self, autologin_user, backend_svn, csrf_token):
891 891 repo_name = backend_svn.repo_name
892 892 data = self.FORM_DATA.copy()
893 893 data['csrf_token'] = csrf_token
894 894 self.app.post(
895 895 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
896 896 settings = SettingsModel(repo=repo_name)
897 897 try:
898 898 svn_branches = settings.get_ui_by_section(
899 899 VcsSettingsModel.SVN_BRANCH_SECTION)
900 900 svn_tags = settings.get_ui_by_section(
901 901 VcsSettingsModel.SVN_TAG_SECTION)
902 902 assert len(svn_branches) == 0
903 903 assert len(svn_tags) == 0
904 904 finally:
905 905 self._cleanup_repo_settings(settings)
906 906
907 907 def test_svn_settings_are_shown_for_svn_repository(
908 908 self, autologin_user, backend_svn, csrf_token):
909 909 repo_name = backend_svn.repo_name
910 910 response = self.app.get(
911 911 url('repo_vcs_settings', repo_name=repo_name), status=200)
912 912 response.mustcontain('Subversion Settings')
913 913
914 914 @pytest.mark.skip_backends('svn')
915 915 def test_svn_settings_are_not_created_for_not_svn_repository(
916 916 self, autologin_user, backend, csrf_token):
917 917 repo_name = backend.repo_name
918 918 data = self.FORM_DATA.copy()
919 919 data['csrf_token'] = csrf_token
920 920 self.app.post(
921 921 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
922 922 settings = SettingsModel(repo=repo_name)
923 923 try:
924 924 svn_branches = settings.get_ui_by_section(
925 925 VcsSettingsModel.SVN_BRANCH_SECTION)
926 926 svn_tags = settings.get_ui_by_section(
927 927 VcsSettingsModel.SVN_TAG_SECTION)
928 928 assert len(svn_branches) == 0
929 929 assert len(svn_tags) == 0
930 930 finally:
931 931 self._cleanup_repo_settings(settings)
932 932
933 933 @pytest.mark.skip_backends('svn')
934 934 def test_svn_settings_are_shown_only_for_svn_repository(
935 935 self, autologin_user, backend, csrf_token):
936 936 repo_name = backend.repo_name
937 937 response = self.app.get(
938 938 url('repo_vcs_settings', repo_name=repo_name), status=200)
939 939 response.mustcontain(no='Subversion Settings')
940 940
941 941 def test_hg_settings_are_created(
942 942 self, autologin_user, backend_hg, csrf_token):
943 943 repo_name = backend_hg.repo_name
944 944 data = self.FORM_DATA.copy()
945 945 data['new_svn_tag'] = 'svn-tag'
946 946 data['new_svn_branch'] = 'svn-branch'
947 947 data['csrf_token'] = csrf_token
948 948 self.app.post(
949 949 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
950 950 settings = SettingsModel(repo=repo_name)
951 951 try:
952 952 largefiles_ui = settings.get_ui_by_section_and_key(
953 953 'extensions', 'largefiles')
954 954 assert largefiles_ui.ui_active is False
955 955 phases_ui = settings.get_ui_by_section_and_key(
956 956 'phases', 'publish')
957 957 assert str2bool(phases_ui.ui_value) is False
958 958 finally:
959 959 self._cleanup_repo_settings(settings)
960 960
961 961 def test_hg_settings_are_updated(
962 962 self, autologin_user, backend_hg, csrf_token):
963 963 repo_name = backend_hg.repo_name
964 964 settings = SettingsModel(repo=repo_name)
965 965 settings.create_ui_section_value(
966 966 'extensions', '', key='largefiles', active=True)
967 967 settings.create_ui_section_value(
968 968 'phases', '1', key='publish', active=True)
969 969
970 970 data = self.FORM_DATA.copy()
971 971 data['csrf_token'] = csrf_token
972 972 self.app.post(
973 973 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
974 974 try:
975 975 largefiles_ui = settings.get_ui_by_section_and_key(
976 976 'extensions', 'largefiles')
977 977 assert largefiles_ui.ui_active is False
978 978 phases_ui = settings.get_ui_by_section_and_key(
979 979 'phases', 'publish')
980 980 assert str2bool(phases_ui.ui_value) is False
981 981 finally:
982 982 self._cleanup_repo_settings(settings)
983 983
984 984 def test_hg_settings_are_shown_for_hg_repository(
985 985 self, autologin_user, backend_hg, csrf_token):
986 986 repo_name = backend_hg.repo_name
987 987 response = self.app.get(
988 988 url('repo_vcs_settings', repo_name=repo_name), status=200)
989 989 response.mustcontain('Mercurial Settings')
990 990
991 991 @pytest.mark.skip_backends('hg')
992 992 def test_hg_settings_are_created_only_for_hg_repository(
993 993 self, autologin_user, backend, csrf_token):
994 994 repo_name = backend.repo_name
995 995 data = self.FORM_DATA.copy()
996 996 data['csrf_token'] = csrf_token
997 997 self.app.post(
998 998 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
999 999 settings = SettingsModel(repo=repo_name)
1000 1000 try:
1001 1001 largefiles_ui = settings.get_ui_by_section_and_key(
1002 1002 'extensions', 'largefiles')
1003 1003 assert largefiles_ui is None
1004 1004 phases_ui = settings.get_ui_by_section_and_key(
1005 1005 'phases', 'publish')
1006 1006 assert phases_ui is None
1007 1007 finally:
1008 1008 self._cleanup_repo_settings(settings)
1009 1009
1010 1010 @pytest.mark.skip_backends('hg')
1011 1011 def test_hg_settings_are_shown_only_for_hg_repository(
1012 1012 self, autologin_user, backend, csrf_token):
1013 1013 repo_name = backend.repo_name
1014 1014 response = self.app.get(
1015 1015 url('repo_vcs_settings', repo_name=repo_name), status=200)
1016 1016 response.mustcontain(no='Mercurial Settings')
1017 1017
1018 1018 @pytest.mark.skip_backends('hg')
1019 1019 def test_hg_settings_are_updated_only_for_hg_repository(
1020 1020 self, autologin_user, backend, csrf_token):
1021 1021 repo_name = backend.repo_name
1022 1022 settings = SettingsModel(repo=repo_name)
1023 1023 settings.create_ui_section_value(
1024 1024 'extensions', '', key='largefiles', active=True)
1025 1025 settings.create_ui_section_value(
1026 1026 'phases', '1', key='publish', active=True)
1027 1027
1028 1028 data = self.FORM_DATA.copy()
1029 1029 data['csrf_token'] = csrf_token
1030 1030 self.app.post(
1031 1031 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1032 1032 try:
1033 1033 largefiles_ui = settings.get_ui_by_section_and_key(
1034 1034 'extensions', 'largefiles')
1035 1035 assert largefiles_ui.ui_active is True
1036 1036 phases_ui = settings.get_ui_by_section_and_key(
1037 1037 'phases', 'publish')
1038 1038 assert phases_ui.ui_value == '1'
1039 1039 finally:
1040 1040 self._cleanup_repo_settings(settings)
1041 1041
1042 1042 def test_per_repo_svn_settings_are_displayed(
1043 1043 self, autologin_user, backend_svn, settings_util):
1044 1044 repo = backend_svn.create_repo()
1045 1045 repo_name = repo.repo_name
1046 1046 branches = [
1047 1047 settings_util.create_repo_rhodecode_ui(
1048 1048 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1049 1049 'branch_{}'.format(i))
1050 1050 for i in range(10)]
1051 1051 tags = [
1052 1052 settings_util.create_repo_rhodecode_ui(
1053 1053 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1054 1054 for i in range(10)]
1055 1055
1056 1056 response = self.app.get(
1057 1057 url('repo_vcs_settings', repo_name=repo_name), status=200)
1058 1058 assert_response = AssertResponse(response)
1059 1059 for branch in branches:
1060 1060 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1061 1061 element = assert_response.get_element(css_selector)
1062 1062 assert element.value == branch.ui_value
1063 1063 for tag in tags:
1064 1064 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1065 1065 element = assert_response.get_element(css_selector)
1066 1066 assert element.value == tag.ui_value
1067 1067
1068 1068 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1069 1069 self, autologin_user, backend_svn, settings_util):
1070 1070 repo = backend_svn.create_repo()
1071 1071 repo_name = repo.repo_name
1072 1072 response = self.app.get(
1073 1073 url('repo_vcs_settings', repo_name=repo_name), status=200)
1074 1074 response.mustcontain(no='<label>Hooks:</label>')
1075 1075 response.mustcontain(no='<label>Pull Request Settings:</label>')
1076 1076
1077 1077 def test_inherit_global_settings_value_is_saved(
1078 1078 self, autologin_user, backend, csrf_token):
1079 1079 repo_name = backend.repo_name
1080 1080 data = self.FORM_DATA.copy()
1081 1081 data['csrf_token'] = csrf_token
1082 1082 data['inherit_global_settings'] = True
1083 1083 self.app.post(
1084 1084 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1085 1085
1086 1086 settings = SettingsModel(repo=repo_name)
1087 1087 vcs_settings = VcsSettingsModel(repo=repo_name)
1088 1088 try:
1089 1089 assert vcs_settings.inherit_global_settings is True
1090 1090 finally:
1091 1091 self._cleanup_repo_settings(settings)
1092 1092
1093 1093 def test_repo_cache_is_invalidated_when_settings_are_updated(
1094 1094 self, autologin_user, backend, csrf_token):
1095 1095 repo_name = backend.repo_name
1096 1096 data = self.FORM_DATA.copy()
1097 1097 data['csrf_token'] = csrf_token
1098 1098 data['inherit_global_settings'] = True
1099 1099 settings = SettingsModel(repo=repo_name)
1100 1100
1101 1101 invalidation_patcher = mock.patch(
1102 1102 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1103 1103 with invalidation_patcher as invalidation_mock:
1104 1104 self.app.post(
1105 1105 url('repo_vcs_settings', repo_name=repo_name), data,
1106 1106 status=302)
1107 1107 try:
1108 1108 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1109 1109 finally:
1110 1110 self._cleanup_repo_settings(settings)
1111 1111
1112 1112 def test_other_settings_not_saved_inherit_global_settings_is_true(
1113 1113 self, autologin_user, backend, csrf_token):
1114 1114 repo_name = backend.repo_name
1115 1115 data = self.FORM_DATA.copy()
1116 1116 data['csrf_token'] = csrf_token
1117 1117 data['inherit_global_settings'] = True
1118 1118 self.app.post(
1119 1119 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1120 1120
1121 1121 settings = SettingsModel(repo=repo_name)
1122 1122 ui_settings = (
1123 1123 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1124 1124
1125 1125 vcs_settings = []
1126 1126 try:
1127 1127 for section, key in ui_settings:
1128 1128 ui = settings.get_ui_by_section_and_key(section, key)
1129 1129 if ui:
1130 1130 vcs_settings.append(ui)
1131 1131 vcs_settings.extend(settings.get_ui_by_section(
1132 1132 VcsSettingsModel.SVN_BRANCH_SECTION))
1133 1133 vcs_settings.extend(settings.get_ui_by_section(
1134 1134 VcsSettingsModel.SVN_TAG_SECTION))
1135 1135 for name in VcsSettingsModel.GENERAL_SETTINGS:
1136 1136 setting = settings.get_setting_by_name(name)
1137 1137 if setting:
1138 1138 vcs_settings.append(setting)
1139 1139 assert vcs_settings == []
1140 1140 finally:
1141 1141 self._cleanup_repo_settings(settings)
1142 1142
1143 1143 def test_delete_svn_branch_and_tag_patterns(
1144 1144 self, autologin_user, backend_svn, settings_util, csrf_token):
1145 1145 repo = backend_svn.create_repo()
1146 1146 repo_name = repo.repo_name
1147 1147 branch = settings_util.create_repo_rhodecode_ui(
1148 1148 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1149 1149 cleanup=False)
1150 1150 tag = settings_util.create_repo_rhodecode_ui(
1151 1151 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1152 1152 data = {
1153 1153 '_method': 'delete',
1154 1154 'csrf_token': csrf_token
1155 1155 }
1156 1156 for id_ in (branch.ui_id, tag.ui_id):
1157 1157 data['delete_svn_pattern'] = id_,
1158 1158 self.app.post(
1159 1159 url('repo_vcs_settings', repo_name=repo_name), data,
1160 1160 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1161 1161 settings = VcsSettingsModel(repo=repo_name)
1162 1162 assert settings.get_repo_svn_branch_patterns() == []
1163 1163
1164 1164 def test_delete_svn_branch_requires_repo_admin_permission(
1165 1165 self, backend_svn, user_util, settings_util, csrf_token):
1166 1166 repo = backend_svn.create_repo()
1167 1167 repo_name = repo.repo_name
1168 1168 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1169 1169 logout_user_session(self.app, csrf_token)
1170 1170 session = login_user_session(
1171 1171 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1172 1172 csrf_token = auth.get_csrf_token(session)
1173 1173 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1174 1174 branch = settings_util.create_repo_rhodecode_ui(
1175 1175 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1176 1176 cleanup=False)
1177 1177 data = {
1178 1178 '_method': 'delete',
1179 1179 'csrf_token': csrf_token,
1180 1180 'delete_svn_pattern': branch.ui_id
1181 1181 }
1182 1182 self.app.post(
1183 1183 url('repo_vcs_settings', repo_name=repo_name), data,
1184 1184 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1185 1185
1186 1186 def test_delete_svn_branch_raises_400_when_not_found(
1187 1187 self, autologin_user, backend_svn, settings_util, csrf_token):
1188 1188 repo_name = backend_svn.repo_name
1189 1189 data = {
1190 1190 '_method': 'delete',
1191 1191 'delete_svn_pattern': 123,
1192 1192 'csrf_token': csrf_token
1193 1193 }
1194 1194 self.app.post(
1195 1195 url('repo_vcs_settings', repo_name=repo_name), data,
1196 1196 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1197 1197
1198 1198 def test_delete_svn_branch_raises_400_when_no_id_specified(
1199 1199 self, autologin_user, backend_svn, settings_util, csrf_token):
1200 1200 repo_name = backend_svn.repo_name
1201 1201 data = {
1202 1202 '_method': 'delete',
1203 1203 'csrf_token': csrf_token
1204 1204 }
1205 1205 self.app.post(
1206 1206 url('repo_vcs_settings', repo_name=repo_name), data,
1207 1207 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1208 1208
1209 1209 def _cleanup_repo_settings(self, settings_model):
1210 1210 cleanup = []
1211 1211 ui_settings = (
1212 1212 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1213 1213
1214 1214 for section, key in ui_settings:
1215 1215 ui = settings_model.get_ui_by_section_and_key(section, key)
1216 1216 if ui:
1217 1217 cleanup.append(ui)
1218 1218
1219 1219 cleanup.extend(settings_model.get_ui_by_section(
1220 1220 VcsSettingsModel.INHERIT_SETTINGS))
1221 1221 cleanup.extend(settings_model.get_ui_by_section(
1222 1222 VcsSettingsModel.SVN_BRANCH_SECTION))
1223 1223 cleanup.extend(settings_model.get_ui_by_section(
1224 1224 VcsSettingsModel.SVN_TAG_SECTION))
1225 1225
1226 1226 for name in VcsSettingsModel.GENERAL_SETTINGS:
1227 1227 setting = settings_model.get_setting_by_name(name)
1228 1228 if setting:
1229 1229 cleanup.append(setting)
1230 1230
1231 1231 for object_ in cleanup:
1232 1232 Session().delete(object_)
1233 1233 Session().commit()
1234 1234
1235 1235 def assert_repo_value_equals_global_value(self, response, setting):
1236 1236 assert_response = AssertResponse(response)
1237 1237 global_css_selector = '[name={}_inherited]'.format(setting)
1238 1238 repo_css_selector = '[name={}]'.format(setting)
1239 1239 repo_element = assert_response.get_element(repo_css_selector)
1240 1240 global_element = assert_response.get_element(global_css_selector)
1241 1241 assert repo_element.value == global_element.value
1242 1242
1243 1243
1244 1244 def _get_permission_for_user(user, repo):
1245 1245 perm = UserRepoToPerm.query()\
1246 1246 .filter(UserRepoToPerm.repository ==
1247 1247 Repository.get_by_repo_name(repo))\
1248 1248 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1249 1249 .all()
1250 1250 return perm
@@ -1,1034 +1,1034 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.lib.utils2 import str2bool
25 25 from rhodecode.model.meta import Session
26 26 from rhodecode.model.settings import VcsSettingsModel, UiSetting
27 27
28 28
29 29 HOOKS_FORM_DATA = {
30 30 'hooks_changegroup_repo_size': True,
31 31 'hooks_changegroup_push_logger': True,
32 32 'hooks_outgoing_pull_logger': True
33 33 }
34 34
35 35 SVN_FORM_DATA = {
36 36 'new_svn_branch': 'test-branch',
37 37 'new_svn_tag': 'test-tag'
38 38 }
39 39
40 40 GENERAL_FORM_DATA = {
41 41 'rhodecode_pr_merge_enabled': True,
42 42 'rhodecode_use_outdated_comments': True,
43 43 'rhodecode_hg_use_rebase_for_merging': True,
44 44 }
45 45
46 46
47 47 class TestInheritGlobalSettingsProperty(object):
48 48 def test_get_raises_exception_when_repository_not_specified(self):
49 49 model = VcsSettingsModel()
50 50 with pytest.raises(Exception) as exc_info:
51 51 model.inherit_global_settings
52 52 assert exc_info.value.message == 'Repository is not specified'
53 53
54 54 def test_true_is_returned_when_value_is_not_found(self, repo_stub):
55 55 model = VcsSettingsModel(repo=repo_stub.repo_name)
56 56 assert model.inherit_global_settings is True
57 57
58 58 def test_value_is_returned(self, repo_stub, settings_util):
59 59 model = VcsSettingsModel(repo=repo_stub.repo_name)
60 60 settings_util.create_repo_rhodecode_setting(
61 61 repo_stub, VcsSettingsModel.INHERIT_SETTINGS, False, 'bool')
62 62 assert model.inherit_global_settings is False
63 63
64 64 def test_value_is_set(self, repo_stub):
65 65 model = VcsSettingsModel(repo=repo_stub.repo_name)
66 66 model.inherit_global_settings = False
67 67 setting = model.repo_settings.get_setting_by_name(
68 68 VcsSettingsModel.INHERIT_SETTINGS)
69 69 try:
70 70 assert setting.app_settings_type == 'bool'
71 71 assert setting.app_settings_value is False
72 72 finally:
73 73 Session().delete(setting)
74 74 Session().commit()
75 75
76 76 def test_set_raises_exception_when_repository_not_specified(self):
77 77 model = VcsSettingsModel()
78 78 with pytest.raises(Exception) as exc_info:
79 79 model.inherit_global_settings = False
80 80 assert exc_info.value.message == 'Repository is not specified'
81 81
82 82
83 83 class TestVcsSettingsModel(object):
84 84 def test_global_svn_branch_patterns(self):
85 85 model = VcsSettingsModel()
86 86 expected_result = {'test': 'test'}
87 87 with mock.patch.object(model, 'global_settings') as settings_mock:
88 88 get_settings = settings_mock.get_ui_by_section
89 89 get_settings.return_value = expected_result
90 90 settings_mock.return_value = expected_result
91 91 result = model.get_global_svn_branch_patterns()
92 92
93 93 get_settings.assert_called_once_with(model.SVN_BRANCH_SECTION)
94 94 assert expected_result == result
95 95
96 96 def test_repo_svn_branch_patterns(self):
97 97 model = VcsSettingsModel()
98 98 expected_result = {'test': 'test'}
99 99 with mock.patch.object(model, 'repo_settings') as settings_mock:
100 100 get_settings = settings_mock.get_ui_by_section
101 101 get_settings.return_value = expected_result
102 102 settings_mock.return_value = expected_result
103 103 result = model.get_repo_svn_branch_patterns()
104 104
105 105 get_settings.assert_called_once_with(model.SVN_BRANCH_SECTION)
106 106 assert expected_result == result
107 107
108 108 def test_repo_svn_branch_patterns_raises_exception_when_repo_is_not_set(
109 109 self):
110 110 model = VcsSettingsModel()
111 111 with pytest.raises(Exception) as exc_info:
112 112 model.get_repo_svn_branch_patterns()
113 113 assert exc_info.value.message == 'Repository is not specified'
114 114
115 115 def test_global_svn_tag_patterns(self):
116 116 model = VcsSettingsModel()
117 117 expected_result = {'test': 'test'}
118 118 with mock.patch.object(model, 'global_settings') as settings_mock:
119 119 get_settings = settings_mock.get_ui_by_section
120 120 get_settings.return_value = expected_result
121 121 settings_mock.return_value = expected_result
122 122 result = model.get_global_svn_tag_patterns()
123 123
124 124 get_settings.assert_called_once_with(model.SVN_TAG_SECTION)
125 125 assert expected_result == result
126 126
127 127 def test_repo_svn_tag_patterns(self):
128 128 model = VcsSettingsModel()
129 129 expected_result = {'test': 'test'}
130 130 with mock.patch.object(model, 'repo_settings') as settings_mock:
131 131 get_settings = settings_mock.get_ui_by_section
132 132 get_settings.return_value = expected_result
133 133 settings_mock.return_value = expected_result
134 134 result = model.get_repo_svn_tag_patterns()
135 135
136 136 get_settings.assert_called_once_with(model.SVN_TAG_SECTION)
137 137 assert expected_result == result
138 138
139 139 def test_repo_svn_tag_patterns_raises_exception_when_repo_is_not_set(self):
140 140 model = VcsSettingsModel()
141 141 with pytest.raises(Exception) as exc_info:
142 142 model.get_repo_svn_tag_patterns()
143 143 assert exc_info.value.message == 'Repository is not specified'
144 144
145 145 def test_get_global_settings(self):
146 146 expected_result = {'test': 'test'}
147 147 model = VcsSettingsModel()
148 148 with mock.patch.object(model, '_collect_all_settings') as collect_mock:
149 149 collect_mock.return_value = expected_result
150 150 result = model.get_global_settings()
151 151
152 152 collect_mock.assert_called_once_with(global_=True)
153 153 assert result == expected_result
154 154
155 155 def test_get_repo_settings(self, repo_stub):
156 156 model = VcsSettingsModel(repo=repo_stub.repo_name)
157 157 expected_result = {'test': 'test'}
158 158 with mock.patch.object(model, '_collect_all_settings') as collect_mock:
159 159 collect_mock.return_value = expected_result
160 160 result = model.get_repo_settings()
161 161
162 162 collect_mock.assert_called_once_with(global_=False)
163 163 assert result == expected_result
164 164
165 165 @pytest.mark.parametrize('settings, global_', [
166 166 ('global_settings', True),
167 167 ('repo_settings', False)
168 168 ])
169 169 def test_collect_all_settings(self, settings, global_):
170 170 model = VcsSettingsModel()
171 171 result_mock = self._mock_result()
172 172
173 173 settings_patch = mock.patch.object(model, settings)
174 174 with settings_patch as settings_mock:
175 175 settings_mock.get_ui_by_section_and_key.return_value = result_mock
176 176 settings_mock.get_setting_by_name.return_value = result_mock
177 177 result = model._collect_all_settings(global_=global_)
178 178
179 179 ui_settings = model.HG_SETTINGS + model.HOOKS_SETTINGS
180 180 self._assert_get_settings_calls(
181 181 settings_mock, ui_settings, model.GENERAL_SETTINGS)
182 182 self._assert_collect_all_settings_result(
183 183 ui_settings, model.GENERAL_SETTINGS, result)
184 184
185 185 @pytest.mark.parametrize('settings, global_', [
186 186 ('global_settings', True),
187 187 ('repo_settings', False)
188 188 ])
189 189 def test_collect_all_settings_without_empty_value(self, settings, global_):
190 190 model = VcsSettingsModel()
191 191
192 192 settings_patch = mock.patch.object(model, settings)
193 193 with settings_patch as settings_mock:
194 194 settings_mock.get_ui_by_section_and_key.return_value = None
195 195 settings_mock.get_setting_by_name.return_value = None
196 196 result = model._collect_all_settings(global_=global_)
197 197
198 198 assert result == {}
199 199
200 200 def _mock_result(self):
201 201 result_mock = mock.Mock()
202 202 result_mock.ui_value = 'ui_value'
203 203 result_mock.ui_active = True
204 204 result_mock.app_settings_value = 'setting_value'
205 205 return result_mock
206 206
207 207 def _assert_get_settings_calls(
208 208 self, settings_mock, ui_settings, general_settings):
209 209 assert (
210 210 settings_mock.get_ui_by_section_and_key.call_count ==
211 211 len(ui_settings))
212 212 assert (
213 213 settings_mock.get_setting_by_name.call_count ==
214 214 len(general_settings))
215 215
216 216 for section, key in ui_settings:
217 217 expected_call = mock.call(section, key)
218 218 assert (
219 219 expected_call in
220 220 settings_mock.get_ui_by_section_and_key.call_args_list)
221 221
222 222 for name in general_settings:
223 223 expected_call = mock.call(name)
224 224 assert (
225 225 expected_call in
226 226 settings_mock.get_setting_by_name.call_args_list)
227 227
228 228 def _assert_collect_all_settings_result(
229 229 self, ui_settings, general_settings, result):
230 230 expected_result = {}
231 231 for section, key in ui_settings:
232 232 key = '{}_{}'.format(section, key.replace('.', '_'))
233 233 value = True if section in ('extensions', 'hooks') else 'ui_value'
234 234 expected_result[key] = value
235 235
236 236 for name in general_settings:
237 237 key = 'rhodecode_' + name
238 238 expected_result[key] = 'setting_value'
239 239
240 240 assert expected_result == result
241 241
242 242
243 243 class TestCreateOrUpdateRepoHookSettings(object):
244 244 def test_create_when_no_repo_object_found(self, repo_stub):
245 245 model = VcsSettingsModel(repo=repo_stub.repo_name)
246 246
247 247 self._create_settings(model, HOOKS_FORM_DATA)
248 248
249 249 cleanup = []
250 250 try:
251 251 for section, key in model.HOOKS_SETTINGS:
252 252 ui = model.repo_settings.get_ui_by_section_and_key(
253 253 section, key)
254 254 assert ui.ui_active is True
255 255 cleanup.append(ui)
256 256 finally:
257 257 for ui in cleanup:
258 258 Session().delete(ui)
259 259 Session().commit()
260 260
261 261 def test_create_raises_exception_when_data_incomplete(self, repo_stub):
262 262 model = VcsSettingsModel(repo=repo_stub.repo_name)
263 263
264 264 deleted_key = 'hooks_changegroup_repo_size'
265 265 data = HOOKS_FORM_DATA.copy()
266 266 data.pop(deleted_key)
267 267
268 268 with pytest.raises(ValueError) as exc_info:
269 269 model.create_or_update_repo_hook_settings(data)
270 270 assert (
271 271 exc_info.value.message ==
272 272 'The given data does not contain {} key'.format(deleted_key))
273 273
274 274 def test_update_when_repo_object_found(self, repo_stub, settings_util):
275 275 model = VcsSettingsModel(repo=repo_stub.repo_name)
276 276 for section, key in model.HOOKS_SETTINGS:
277 277 settings_util.create_repo_rhodecode_ui(
278 278 repo_stub, section, None, key=key, active=False)
279 279 model.create_or_update_repo_hook_settings(HOOKS_FORM_DATA)
280 280 for section, key in model.HOOKS_SETTINGS:
281 281 ui = model.repo_settings.get_ui_by_section_and_key(section, key)
282 282 assert ui.ui_active is True
283 283
284 284 def _create_settings(self, model, data):
285 285 global_patch = mock.patch.object(model, 'global_settings')
286 286 global_setting = mock.Mock()
287 287 global_setting.ui_value = 'Test value'
288 288 with global_patch as global_mock:
289 289 global_mock.get_ui_by_section_and_key.return_value = global_setting
290 290 model.create_or_update_repo_hook_settings(HOOKS_FORM_DATA)
291 291
292 292
293 293 class TestUpdateGlobalHookSettings(object):
294 294 def test_update_raises_exception_when_data_incomplete(self):
295 295 model = VcsSettingsModel()
296 296
297 297 deleted_key = 'hooks_changegroup_repo_size'
298 298 data = HOOKS_FORM_DATA.copy()
299 299 data.pop(deleted_key)
300 300
301 301 with pytest.raises(ValueError) as exc_info:
302 302 model.update_global_hook_settings(data)
303 303 assert (
304 304 exc_info.value.message ==
305 305 'The given data does not contain {} key'.format(deleted_key))
306 306
307 307 def test_update_global_hook_settings(self, settings_util):
308 308 model = VcsSettingsModel()
309 309 setting_mock = mock.MagicMock()
310 310 setting_mock.ui_active = False
311 311 get_settings_patcher = mock.patch.object(
312 312 model.global_settings, 'get_ui_by_section_and_key',
313 313 return_value=setting_mock)
314 314 session_patcher = mock.patch('rhodecode.model.settings.Session')
315 315 with get_settings_patcher as get_settings_mock, session_patcher:
316 316 model.update_global_hook_settings(HOOKS_FORM_DATA)
317 317 assert setting_mock.ui_active is True
318 318 assert get_settings_mock.call_count == 3
319 319
320 320
321 321 class TestCreateOrUpdateRepoGeneralSettings(object):
322 322 def test_calls_create_or_update_general_settings(self, repo_stub):
323 323 model = VcsSettingsModel(repo=repo_stub.repo_name)
324 324 create_patch = mock.patch.object(
325 325 model, '_create_or_update_general_settings')
326 326 with create_patch as create_mock:
327 327 model.create_or_update_repo_pr_settings(GENERAL_FORM_DATA)
328 328 create_mock.assert_called_once_with(
329 329 model.repo_settings, GENERAL_FORM_DATA)
330 330
331 331 def test_raises_exception_when_repository_is_not_specified(self):
332 332 model = VcsSettingsModel()
333 333 with pytest.raises(Exception) as exc_info:
334 334 model.create_or_update_repo_pr_settings(GENERAL_FORM_DATA)
335 335 assert exc_info.value.message == 'Repository is not specified'
336 336
337 337
338 338 class TestCreateOrUpdatGlobalGeneralSettings(object):
339 339 def test_calls_create_or_update_general_settings(self):
340 340 model = VcsSettingsModel()
341 341 create_patch = mock.patch.object(
342 342 model, '_create_or_update_general_settings')
343 343 with create_patch as create_mock:
344 344 model.create_or_update_global_pr_settings(GENERAL_FORM_DATA)
345 345 create_mock.assert_called_once_with(
346 346 model.global_settings, GENERAL_FORM_DATA)
347 347
348 348
349 349 class TestCreateOrUpdateGeneralSettings(object):
350 350 def test_create_when_no_repo_settings_found(self, repo_stub):
351 351 model = VcsSettingsModel(repo=repo_stub.repo_name)
352 352 model._create_or_update_general_settings(
353 353 model.repo_settings, GENERAL_FORM_DATA)
354 354
355 355 cleanup = []
356 356 try:
357 357 for name in model.GENERAL_SETTINGS:
358 358 setting = model.repo_settings.get_setting_by_name(name)
359 359 assert setting.app_settings_value is True
360 360 cleanup.append(setting)
361 361 finally:
362 362 for setting in cleanup:
363 363 Session().delete(setting)
364 364 Session().commit()
365 365
366 366 def test_create_raises_exception_when_data_incomplete(self, repo_stub):
367 367 model = VcsSettingsModel(repo=repo_stub.repo_name)
368 368
369 369 deleted_key = 'rhodecode_pr_merge_enabled'
370 370 data = GENERAL_FORM_DATA.copy()
371 371 data.pop(deleted_key)
372 372
373 373 with pytest.raises(ValueError) as exc_info:
374 374 model._create_or_update_general_settings(model.repo_settings, data)
375 375 assert (
376 376 exc_info.value.message ==
377 377 'The given data does not contain {} key'.format(deleted_key))
378 378
379 379 def test_update_when_repo_setting_found(self, repo_stub, settings_util):
380 380 model = VcsSettingsModel(repo=repo_stub.repo_name)
381 381 for name in model.GENERAL_SETTINGS:
382 382 settings_util.create_repo_rhodecode_setting(
383 383 repo_stub, name, False, 'bool')
384 384
385 385 model._create_or_update_general_settings(
386 386 model.repo_settings, GENERAL_FORM_DATA)
387 387
388 388 for name in model.GENERAL_SETTINGS:
389 389 setting = model.repo_settings.get_setting_by_name(name)
390 390 assert setting.app_settings_value is True
391 391
392 392
393 393 class TestCreateRepoSvnSettings(object):
394 394 def test_calls_create_svn_settings(self, repo_stub):
395 395 model = VcsSettingsModel(repo=repo_stub.repo_name)
396 396 with mock.patch.object(model, '_create_svn_settings') as create_mock:
397 397 model.create_repo_svn_settings(SVN_FORM_DATA)
398 398 create_mock.assert_called_once_with(model.repo_settings, SVN_FORM_DATA)
399 399
400 400 def test_raises_exception_when_repository_is_not_specified(self):
401 401 model = VcsSettingsModel()
402 402 with pytest.raises(Exception) as exc_info:
403 403 model.create_repo_svn_settings(SVN_FORM_DATA)
404 404 assert exc_info.value.message == 'Repository is not specified'
405 405
406 406
407 407 class TestCreateSvnSettings(object):
408 408 def test_create(self, repo_stub):
409 409 model = VcsSettingsModel(repo=repo_stub.repo_name)
410 410 model._create_svn_settings(model.repo_settings, SVN_FORM_DATA)
411 411 Session().commit()
412 412
413 413 branch_ui = model.repo_settings.get_ui_by_section(
414 414 model.SVN_BRANCH_SECTION)
415 415 tag_ui = model.repo_settings.get_ui_by_section(
416 416 model.SVN_TAG_SECTION)
417 417
418 418 try:
419 419 assert len(branch_ui) == 1
420 420 assert len(tag_ui) == 1
421 421 finally:
422 422 Session().delete(branch_ui[0])
423 423 Session().delete(tag_ui[0])
424 424 Session().commit()
425 425
426 426 def test_create_tag(self, repo_stub):
427 427 model = VcsSettingsModel(repo=repo_stub.repo_name)
428 428 data = SVN_FORM_DATA.copy()
429 429 data.pop('new_svn_branch')
430 430 model._create_svn_settings(model.repo_settings, data)
431 431 Session().commit()
432 432
433 433 branch_ui = model.repo_settings.get_ui_by_section(
434 434 model.SVN_BRANCH_SECTION)
435 435 tag_ui = model.repo_settings.get_ui_by_section(
436 436 model.SVN_TAG_SECTION)
437 437
438 438 try:
439 439 assert len(branch_ui) == 0
440 440 assert len(tag_ui) == 1
441 441 finally:
442 442 Session().delete(tag_ui[0])
443 443 Session().commit()
444 444
445 445 def test_create_nothing_when_no_svn_settings_specified(self, repo_stub):
446 446 model = VcsSettingsModel(repo=repo_stub.repo_name)
447 447 model._create_svn_settings(model.repo_settings, {})
448 448 Session().commit()
449 449
450 450 branch_ui = model.repo_settings.get_ui_by_section(
451 451 model.SVN_BRANCH_SECTION)
452 452 tag_ui = model.repo_settings.get_ui_by_section(
453 453 model.SVN_TAG_SECTION)
454 454
455 455 assert len(branch_ui) == 0
456 456 assert len(tag_ui) == 0
457 457
458 458 def test_create_nothing_when_empty_settings_specified(self, repo_stub):
459 459 model = VcsSettingsModel(repo=repo_stub.repo_name)
460 460 data = {
461 461 'new_svn_branch': '',
462 462 'new_svn_tag': ''
463 463 }
464 464 model._create_svn_settings(model.repo_settings, data)
465 465 Session().commit()
466 466
467 467 branch_ui = model.repo_settings.get_ui_by_section(
468 468 model.SVN_BRANCH_SECTION)
469 469 tag_ui = model.repo_settings.get_ui_by_section(
470 470 model.SVN_TAG_SECTION)
471 471
472 472 assert len(branch_ui) == 0
473 473 assert len(tag_ui) == 0
474 474
475 475
476 476 class TestCreateOrUpdateUi(object):
477 477 def test_create(self, repo_stub):
478 478 model = VcsSettingsModel(repo=repo_stub.repo_name)
479 479 model._create_or_update_ui(
480 480 model.repo_settings, 'test-section', 'test-key', active=False,
481 481 value='False')
482 482 Session().commit()
483 483
484 484 created_ui = model.repo_settings.get_ui_by_section_and_key(
485 485 'test-section', 'test-key')
486 486
487 487 try:
488 488 assert created_ui.ui_active is False
489 489 assert str2bool(created_ui.ui_value) is False
490 490 finally:
491 491 Session().delete(created_ui)
492 492 Session().commit()
493 493
494 494 def test_update(self, repo_stub, settings_util):
495 495 model = VcsSettingsModel(repo=repo_stub.repo_name)
496 496
497 497 largefiles, phases = model.HG_SETTINGS
498 498 section = 'test-section'
499 499 key = 'test-key'
500 500 settings_util.create_repo_rhodecode_ui(
501 501 repo_stub, section, 'True', key=key, active=True)
502 502
503 503 model._create_or_update_ui(
504 504 model.repo_settings, section, key, active=False, value='False')
505 505 Session().commit()
506 506
507 507 created_ui = model.repo_settings.get_ui_by_section_and_key(
508 508 section, key)
509 509 assert created_ui.ui_active is False
510 510 assert str2bool(created_ui.ui_value) is False
511 511
512 512
513 513 class TestCreateOrUpdateRepoHgSettings(object):
514 514 FORM_DATA = {
515 515 'extensions_largefiles': False,
516 516 'phases_publish': False
517 517 }
518 518
519 519 def test_creates_repo_hg_settings_when_data_is_correct(self, repo_stub):
520 520 model = VcsSettingsModel(repo=repo_stub.repo_name)
521 521 with mock.patch.object(model, '_create_or_update_ui') as create_mock:
522 522 model.create_or_update_repo_hg_settings(self.FORM_DATA)
523 523 expected_calls = [
524 524 mock.call(model.repo_settings, 'extensions', 'largefiles',
525 525 active=False, value=''),
526 526 mock.call(model.repo_settings, 'phases', 'publish', value='False'),
527 527 ]
528 528 assert expected_calls == create_mock.call_args_list
529 529
530 530 @pytest.mark.parametrize('field_to_remove', FORM_DATA.keys())
531 531 def test_key_is_not_found(self, repo_stub, field_to_remove):
532 532 model = VcsSettingsModel(repo=repo_stub.repo_name)
533 533 data = self.FORM_DATA.copy()
534 534 data.pop(field_to_remove)
535 535 with pytest.raises(ValueError) as exc_info:
536 536 model.create_or_update_repo_hg_settings(data)
537 537 expected_message = 'The given data does not contain {} key'.format(
538 538 field_to_remove)
539 539 assert exc_info.value.message == expected_message
540 540
541 541 def test_create_raises_exception_when_repository_not_specified(self):
542 542 model = VcsSettingsModel()
543 543 with pytest.raises(Exception) as exc_info:
544 544 model.create_or_update_repo_hg_settings(self.FORM_DATA)
545 545 assert exc_info.value.message == 'Repository is not specified'
546 546
547 547
548 548 class TestUpdateGlobalSslSetting(object):
549 549 def test_updates_global_hg_settings(self):
550 550 model = VcsSettingsModel()
551 551 with mock.patch.object(model, '_create_or_update_ui') as create_mock:
552 552 model.update_global_ssl_setting('False')
553 553 create_mock.assert_called_once_with(
554 554 model.global_settings, 'web', 'push_ssl', value='False')
555 555
556 556
557 557 class TestUpdateGlobalPathSetting(object):
558 558 def test_updates_global_path_settings(self):
559 559 model = VcsSettingsModel()
560 560 with mock.patch.object(model, '_create_or_update_ui') as create_mock:
561 561 model.update_global_path_setting('False')
562 562 create_mock.assert_called_once_with(
563 563 model.global_settings, 'paths', '/', value='False')
564 564
565 565
566 566 class TestCreateOrUpdateGlobalHgSettings(object):
567 567 FORM_DATA = {
568 568 'extensions_largefiles': False,
569 569 'largefiles_usercache': '/example/largefiles-store',
570 570 'phases_publish': False,
571 571 'extensions_hgsubversion': False
572 572 }
573 573
574 574 def test_creates_repo_hg_settings_when_data_is_correct(self):
575 575 model = VcsSettingsModel()
576 576 with mock.patch.object(model, '_create_or_update_ui') as create_mock:
577 577 model.create_or_update_global_hg_settings(self.FORM_DATA)
578 578 expected_calls = [
579 579 mock.call(model.global_settings, 'extensions', 'largefiles',
580 580 active=False, value=''),
581 581 mock.call(model.global_settings, 'largefiles', 'usercache',
582 582 value='/example/largefiles-store'),
583 583 mock.call(model.global_settings, 'phases', 'publish',
584 584 value='False'),
585 585 mock.call(model.global_settings, 'extensions', 'hgsubversion',
586 586 active=False)
587 587 ]
588 588 assert expected_calls == create_mock.call_args_list
589 589
590 590 @pytest.mark.parametrize('field_to_remove', FORM_DATA.keys())
591 591 def test_key_is_not_found(self, repo_stub, field_to_remove):
592 592 model = VcsSettingsModel(repo=repo_stub.repo_name)
593 593 data = self.FORM_DATA.copy()
594 594 data.pop(field_to_remove)
595 595 with pytest.raises(Exception) as exc_info:
596 596 model.create_or_update_global_hg_settings(data)
597 597 expected_message = 'The given data does not contain {} key'.format(
598 598 field_to_remove)
599 599 assert exc_info.value.message == expected_message
600 600
601 601
602 602 class TestDeleteRepoSvnPattern(object):
603 603 def test_success_when_repo_is_set(self, backend_svn):
604 604 repo_name = backend_svn.repo_name
605 605 model = VcsSettingsModel(repo=repo_name)
606 606 delete_ui_patch = mock.patch.object(model.repo_settings, 'delete_ui')
607 607 with delete_ui_patch as delete_ui_mock:
608 608 model.delete_repo_svn_pattern(123)
609 609 delete_ui_mock.assert_called_once_with(123)
610 610
611 611 def test_raises_exception_when_repository_is_not_specified(self):
612 612 model = VcsSettingsModel()
613 613 with pytest.raises(Exception) as exc_info:
614 614 model.delete_repo_svn_pattern(123)
615 615 assert exc_info.value.message == 'Repository is not specified'
616 616
617 617
618 618 class TestDeleteGlobalSvnPattern(object):
619 619 def test_delete_global_svn_pattern_calls_delete_ui(self):
620 620 model = VcsSettingsModel()
621 621 delete_ui_patch = mock.patch.object(model.global_settings, 'delete_ui')
622 622 with delete_ui_patch as delete_ui_mock:
623 623 model.delete_global_svn_pattern(123)
624 624 delete_ui_mock.assert_called_once_with(123)
625 625
626 626
627 627 class TestFilterUiSettings(object):
628 628 def test_settings_are_filtered(self):
629 629 model = VcsSettingsModel()
630 630 repo_settings = [
631 631 UiSetting('extensions', 'largefiles', '', True),
632 632 UiSetting('phases', 'publish', 'True', True),
633 633 UiSetting('hooks', 'changegroup.repo_size', 'hook', True),
634 634 UiSetting('hooks', 'changegroup.push_logger', 'hook', True),
635 635 UiSetting('hooks', 'outgoing.pull_logger', 'hook', True),
636 636 UiSetting(
637 637 'vcs_svn_branch', '84223c972204fa545ca1b22dac7bef5b68d7442d',
638 638 'test_branch', True),
639 639 UiSetting(
640 640 'vcs_svn_tag', '84229c972204fa545ca1b22dac7bef5b68d7442d',
641 641 'test_tag', True),
642 642 ]
643 643 non_repo_settings = [
644 644 UiSetting('largefiles', 'usercache', '/example/largefiles-store', True),
645 645 UiSetting('test', 'outgoing.pull_logger', 'hook', True),
646 646 UiSetting('hooks', 'test2', 'hook', True),
647 647 UiSetting(
648 648 'vcs_svn_repo', '84229c972204fa545ca1b22dac7bef5b68d7442d',
649 649 'test_tag', True),
650 650 ]
651 651 settings = repo_settings + non_repo_settings
652 652 filtered_settings = model._filter_ui_settings(settings)
653 653 assert sorted(filtered_settings) == sorted(repo_settings)
654 654
655 655
656 656 class TestFilterGeneralSettings(object):
657 657 def test_settings_are_filtered(self):
658 658 model = VcsSettingsModel()
659 659 settings = {
660 660 'rhodecode_abcde': 'value1',
661 661 'rhodecode_vwxyz': 'value2',
662 662 }
663 663 general_settings = {
664 664 'rhodecode_{}'.format(key): 'value'
665 665 for key in VcsSettingsModel.GENERAL_SETTINGS
666 666 }
667 667 settings.update(general_settings)
668 668
669 669 filtered_settings = model._filter_general_settings(general_settings)
670 670 assert sorted(filtered_settings) == sorted(general_settings)
671 671
672 672
673 673 class TestGetRepoUiSettings(object):
674 674 def test_global_uis_are_returned_when_no_repo_uis_found(
675 675 self, repo_stub):
676 676 model = VcsSettingsModel(repo=repo_stub.repo_name)
677 677 result = model.get_repo_ui_settings()
678 678 svn_sections = (
679 679 VcsSettingsModel.SVN_TAG_SECTION,
680 680 VcsSettingsModel.SVN_BRANCH_SECTION)
681 681 expected_result = [
682 682 s for s in model.global_settings.get_ui()
683 683 if s.section not in svn_sections]
684 684 assert sorted(result) == sorted(expected_result)
685 685
686 686 def test_repo_uis_are_overriding_global_uis(
687 687 self, repo_stub, settings_util):
688 688 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
689 689 settings_util.create_repo_rhodecode_ui(
690 690 repo_stub, section, 'repo', key=key, active=False)
691 691 model = VcsSettingsModel(repo=repo_stub.repo_name)
692 692 result = model.get_repo_ui_settings()
693 693 for setting in result:
694 694 locator = (setting.section, setting.key)
695 695 if locator in VcsSettingsModel.HOOKS_SETTINGS:
696 696 assert setting.value == 'repo'
697 697
698 698 assert setting.active is False
699 699
700 700 def test_global_svn_patterns_are_not_in_list(
701 701 self, repo_stub, settings_util):
702 702 svn_sections = (
703 703 VcsSettingsModel.SVN_TAG_SECTION,
704 704 VcsSettingsModel.SVN_BRANCH_SECTION)
705 705 for section in svn_sections:
706 706 settings_util.create_rhodecode_ui(
707 707 section, 'repo', key='deadbeef' + section, active=False)
708 708 model = VcsSettingsModel(repo=repo_stub.repo_name)
709 709 result = model.get_repo_ui_settings()
710 710 for setting in result:
711 711 assert setting.section not in svn_sections
712 712
713 713 def test_repo_uis_filtered_by_section_are_returned(
714 714 self, repo_stub, settings_util):
715 715 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
716 716 settings_util.create_repo_rhodecode_ui(
717 717 repo_stub, section, 'repo', key=key, active=False)
718 718 model = VcsSettingsModel(repo=repo_stub.repo_name)
719 719 section, key = VcsSettingsModel.HOOKS_SETTINGS[0]
720 720 result = model.get_repo_ui_settings(section=section)
721 721 for setting in result:
722 722 assert setting.section == section
723 723
724 724 def test_repo_uis_filtered_by_key_are_returned(
725 725 self, repo_stub, settings_util):
726 726 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
727 727 settings_util.create_repo_rhodecode_ui(
728 728 repo_stub, section, 'repo', key=key, active=False)
729 729 model = VcsSettingsModel(repo=repo_stub.repo_name)
730 730 section, key = VcsSettingsModel.HOOKS_SETTINGS[0]
731 731 result = model.get_repo_ui_settings(key=key)
732 732 for setting in result:
733 733 assert setting.key == key
734 734
735 735 def test_raises_exception_when_repository_is_not_specified(self):
736 736 model = VcsSettingsModel()
737 737 with pytest.raises(Exception) as exc_info:
738 738 model.get_repo_ui_settings()
739 739 assert exc_info.value.message == 'Repository is not specified'
740 740
741 741
742 742 class TestGetRepoGeneralSettings(object):
743 743 def test_global_settings_are_returned_when_no_repo_settings_found(
744 744 self, repo_stub):
745 745 model = VcsSettingsModel(repo=repo_stub.repo_name)
746 746 result = model.get_repo_general_settings()
747 747 expected_result = model.global_settings.get_all_settings()
748 748 assert sorted(result) == sorted(expected_result)
749 749
750 750 def test_repo_uis_are_overriding_global_uis(
751 751 self, repo_stub, settings_util):
752 752 for key in VcsSettingsModel.GENERAL_SETTINGS:
753 753 settings_util.create_repo_rhodecode_setting(
754 754 repo_stub, key, 'abcde', type_='unicode')
755 755 model = VcsSettingsModel(repo=repo_stub.repo_name)
756 756 result = model.get_repo_ui_settings()
757 757 for key in result:
758 758 if key in VcsSettingsModel.GENERAL_SETTINGS:
759 759 assert result[key] == 'abcde'
760 760
761 761 def test_raises_exception_when_repository_is_not_specified(self):
762 762 model = VcsSettingsModel()
763 763 with pytest.raises(Exception) as exc_info:
764 764 model.get_repo_general_settings()
765 765 assert exc_info.value.message == 'Repository is not specified'
766 766
767 767
768 768 class TestGetGlobalGeneralSettings(object):
769 769 def test_global_settings_are_returned(self, repo_stub):
770 770 model = VcsSettingsModel()
771 771 result = model.get_global_general_settings()
772 772 expected_result = model.global_settings.get_all_settings()
773 773 assert sorted(result) == sorted(expected_result)
774 774
775 775 def test_repo_uis_are_not_overriding_global_uis(
776 776 self, repo_stub, settings_util):
777 777 for key in VcsSettingsModel.GENERAL_SETTINGS:
778 778 settings_util.create_repo_rhodecode_setting(
779 779 repo_stub, key, 'abcde', type_='unicode')
780 780 model = VcsSettingsModel(repo=repo_stub.repo_name)
781 781 result = model.get_global_general_settings()
782 782 expected_result = model.global_settings.get_all_settings()
783 783 assert sorted(result) == sorted(expected_result)
784 784
785 785
786 786 class TestGetGlobalUiSettings(object):
787 787 def test_global_uis_are_returned(self, repo_stub):
788 788 model = VcsSettingsModel()
789 789 result = model.get_global_ui_settings()
790 790 expected_result = model.global_settings.get_ui()
791 791 assert sorted(result) == sorted(expected_result)
792 792
793 793 def test_repo_uis_are_not_overriding_global_uis(
794 794 self, repo_stub, settings_util):
795 795 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
796 796 settings_util.create_repo_rhodecode_ui(
797 797 repo_stub, section, 'repo', key=key, active=False)
798 798 model = VcsSettingsModel(repo=repo_stub.repo_name)
799 799 result = model.get_global_ui_settings()
800 800 expected_result = model.global_settings.get_ui()
801 801 assert sorted(result) == sorted(expected_result)
802 802
803 803 def test_ui_settings_filtered_by_section(
804 804 self, repo_stub, settings_util):
805 805 model = VcsSettingsModel(repo=repo_stub.repo_name)
806 806 section, key = VcsSettingsModel.HOOKS_SETTINGS[0]
807 807 result = model.get_global_ui_settings(section=section)
808 808 expected_result = model.global_settings.get_ui(section=section)
809 809 assert sorted(result) == sorted(expected_result)
810 810
811 811 def test_ui_settings_filtered_by_key(
812 812 self, repo_stub, settings_util):
813 813 model = VcsSettingsModel(repo=repo_stub.repo_name)
814 814 section, key = VcsSettingsModel.HOOKS_SETTINGS[0]
815 815 result = model.get_global_ui_settings(key=key)
816 816 expected_result = model.global_settings.get_ui(key=key)
817 817 assert sorted(result) == sorted(expected_result)
818 818
819 819
820 820 class TestGetGeneralSettings(object):
821 821 def test_global_settings_are_returned_when_inherited_is_true(
822 822 self, repo_stub, settings_util):
823 823 model = VcsSettingsModel(repo=repo_stub.repo_name)
824 824 model.inherit_global_settings = True
825 825 for key in VcsSettingsModel.GENERAL_SETTINGS:
826 826 settings_util.create_repo_rhodecode_setting(
827 827 repo_stub, key, 'abcde', type_='unicode')
828 828 result = model.get_general_settings()
829 829 expected_result = model.get_global_general_settings()
830 830 assert sorted(result) == sorted(expected_result)
831 831
832 832 def test_repo_settings_are_returned_when_inherited_is_false(
833 833 self, repo_stub, settings_util):
834 834 model = VcsSettingsModel(repo=repo_stub.repo_name)
835 835 model.inherit_global_settings = False
836 836 for key in VcsSettingsModel.GENERAL_SETTINGS:
837 837 settings_util.create_repo_rhodecode_setting(
838 838 repo_stub, key, 'abcde', type_='unicode')
839 839 result = model.get_general_settings()
840 840 expected_result = model.get_repo_general_settings()
841 841 assert sorted(result) == sorted(expected_result)
842 842
843 843 def test_global_settings_are_returned_when_no_repository_specified(self):
844 844 model = VcsSettingsModel()
845 845 result = model.get_general_settings()
846 846 expected_result = model.get_global_general_settings()
847 847 assert sorted(result) == sorted(expected_result)
848 848
849 849
850 850 class TestGetUiSettings(object):
851 851 def test_global_settings_are_returned_when_inherited_is_true(
852 852 self, repo_stub, settings_util):
853 853 model = VcsSettingsModel(repo=repo_stub.repo_name)
854 854 model.inherit_global_settings = True
855 855 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
856 856 settings_util.create_repo_rhodecode_ui(
857 857 repo_stub, section, 'repo', key=key, active=True)
858 858 result = model.get_ui_settings()
859 859 expected_result = model.get_global_ui_settings()
860 860 assert sorted(result) == sorted(expected_result)
861 861
862 862 def test_repo_settings_are_returned_when_inherited_is_false(
863 863 self, repo_stub, settings_util):
864 864 model = VcsSettingsModel(repo=repo_stub.repo_name)
865 865 model.inherit_global_settings = False
866 866 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
867 867 settings_util.create_repo_rhodecode_ui(
868 868 repo_stub, section, 'repo', key=key, active=True)
869 869 result = model.get_ui_settings()
870 870 expected_result = model.get_repo_ui_settings()
871 871 assert sorted(result) == sorted(expected_result)
872 872
873 873 def test_repo_settings_filtered_by_section_and_key(self, repo_stub):
874 874 model = VcsSettingsModel(repo=repo_stub.repo_name)
875 875 model.inherit_global_settings = False
876 876 args = ('section', 'key')
877 877 with mock.patch.object(model, 'get_repo_ui_settings') as settings_mock:
878 878 model.get_ui_settings(*args)
879 879 settings_mock.assert_called_once_with(*args)
880 880
881 881 def test_global_settings_filtered_by_section_and_key(self):
882 882 model = VcsSettingsModel()
883 883 args = ('section', 'key')
884 884 with mock.patch.object(model, 'get_global_ui_settings') as (
885 885 settings_mock):
886 886 model.get_ui_settings(*args)
887 887 settings_mock.assert_called_once_with(*args)
888 888
889 889 def test_global_settings_are_returned_when_no_repository_specified(self):
890 890 model = VcsSettingsModel()
891 891 result = model.get_ui_settings()
892 892 expected_result = model.get_global_ui_settings()
893 893 assert sorted(result) == sorted(expected_result)
894 894
895 895
896 896 class TestGetSvnPatterns(object):
897 897 def test_repo_settings_filtered_by_section_and_key(self, repo_stub):
898 898 model = VcsSettingsModel(repo=repo_stub.repo_name)
899 899 args = ('section', )
900 900 with mock.patch.object(model, 'get_repo_ui_settings') as settings_mock:
901 901 model.get_svn_patterns(*args)
902 902 settings_mock.assert_called_once_with(*args)
903 903
904 904 def test_global_settings_filtered_by_section_and_key(self):
905 905 model = VcsSettingsModel()
906 906 args = ('section', )
907 907 with mock.patch.object(model, 'get_global_ui_settings') as (
908 908 settings_mock):
909 909 model.get_svn_patterns(*args)
910 910 settings_mock.assert_called_once_with(*args)
911 911
912 912
913 913 class TestGetReposLocation(object):
914 914 def test_returns_repos_location(self, repo_stub):
915 915 model = VcsSettingsModel()
916 916
917 917 result_mock = mock.Mock()
918 918 result_mock.ui_value = '/tmp'
919 919
920 920 with mock.patch.object(model, 'global_settings') as settings_mock:
921 921 settings_mock.get_ui_by_key.return_value = result_mock
922 922 result = model.get_repos_location()
923 923
924 924 settings_mock.get_ui_by_key.assert_called_once_with('/')
925 925 assert result == '/tmp'
926 926
927 927
928 928 class TestCreateOrUpdateRepoSettings(object):
929 929 FORM_DATA = {
930 930 'inherit_global_settings': False,
931 931 'hooks_changegroup_repo_size': False,
932 932 'hooks_changegroup_push_logger': False,
933 933 'hooks_outgoing_pull_logger': False,
934 934 'extensions_largefiles': False,
935 935 'largefiles_usercache': '/example/largefiles-store',
936 'phases_publish': 'false',
936 'phases_publish': 'False',
937 937 'rhodecode_pr_merge_enabled': False,
938 938 'rhodecode_use_outdated_comments': False,
939 939 'new_svn_branch': '',
940 940 'new_svn_tag': ''
941 941 }
942 942
943 943 def test_get_raises_exception_when_repository_not_specified(self):
944 944 model = VcsSettingsModel()
945 945 with pytest.raises(Exception) as exc_info:
946 946 model.create_or_update_repo_settings(data=self.FORM_DATA)
947 947 assert exc_info.value.message == 'Repository is not specified'
948 948
949 949 def test_only_svn_settings_are_updated_when_type_is_svn(self, backend_svn):
950 950 repo = backend_svn.create_repo()
951 951 model = VcsSettingsModel(repo=repo)
952 952 with self._patch_model(model) as mocks:
953 953 model.create_or_update_repo_settings(
954 954 data=self.FORM_DATA, inherit_global_settings=False)
955 955 mocks['create_repo_svn_settings'].assert_called_once_with(
956 956 self.FORM_DATA)
957 957 non_called_methods = (
958 958 'create_or_update_repo_hook_settings',
959 959 'create_or_update_repo_pr_settings',
960 960 'create_or_update_repo_hg_settings')
961 961 for method in non_called_methods:
962 962 assert mocks[method].call_count == 0
963 963
964 964 def test_non_svn_settings_are_updated_when_type_is_hg(self, backend_hg):
965 965 repo = backend_hg.create_repo()
966 966 model = VcsSettingsModel(repo=repo)
967 967 with self._patch_model(model) as mocks:
968 968 model.create_or_update_repo_settings(
969 969 data=self.FORM_DATA, inherit_global_settings=False)
970 970
971 971 assert mocks['create_repo_svn_settings'].call_count == 0
972 972 called_methods = (
973 973 'create_or_update_repo_hook_settings',
974 974 'create_or_update_repo_pr_settings',
975 975 'create_or_update_repo_hg_settings')
976 976 for method in called_methods:
977 977 mocks[method].assert_called_once_with(self.FORM_DATA)
978 978
979 979 def test_non_svn_and_hg_settings_are_updated_when_type_is_git(
980 980 self, backend_git):
981 981 repo = backend_git.create_repo()
982 982 model = VcsSettingsModel(repo=repo)
983 983 with self._patch_model(model) as mocks:
984 984 model.create_or_update_repo_settings(
985 985 data=self.FORM_DATA, inherit_global_settings=False)
986 986
987 987 assert mocks['create_repo_svn_settings'].call_count == 0
988 988 called_methods = (
989 989 'create_or_update_repo_hook_settings',
990 990 'create_or_update_repo_pr_settings')
991 991 non_called_methods = (
992 992 'create_repo_svn_settings',
993 993 'create_or_update_repo_hg_settings'
994 994 )
995 995 for method in called_methods:
996 996 mocks[method].assert_called_once_with(self.FORM_DATA)
997 997 for method in non_called_methods:
998 998 assert mocks[method].call_count == 0
999 999
1000 1000 def test_no_methods_are_called_when_settings_are_inherited(
1001 1001 self, backend):
1002 1002 repo = backend.create_repo()
1003 1003 model = VcsSettingsModel(repo=repo)
1004 1004 with self._patch_model(model) as mocks:
1005 1005 model.create_or_update_repo_settings(
1006 1006 data=self.FORM_DATA, inherit_global_settings=True)
1007 1007 for method_name in mocks:
1008 1008 assert mocks[method_name].call_count == 0
1009 1009
1010 1010 def test_cache_is_marked_for_invalidation(self, repo_stub):
1011 1011 model = VcsSettingsModel(repo=repo_stub)
1012 1012 invalidation_patcher = mock.patch(
1013 1013 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1014 1014 with invalidation_patcher as invalidation_mock:
1015 1015 model.create_or_update_repo_settings(
1016 1016 data=self.FORM_DATA, inherit_global_settings=True)
1017 1017 invalidation_mock.assert_called_once_with(
1018 1018 repo_stub.repo_name, delete=True)
1019 1019
1020 1020 def test_inherit_flag_is_saved(self, repo_stub):
1021 1021 model = VcsSettingsModel(repo=repo_stub)
1022 1022 model.inherit_global_settings = True
1023 1023 with self._patch_model(model):
1024 1024 model.create_or_update_repo_settings(
1025 1025 data=self.FORM_DATA, inherit_global_settings=False)
1026 1026 assert model.inherit_global_settings is False
1027 1027
1028 1028 def _patch_model(self, model):
1029 1029 return mock.patch.multiple(
1030 1030 model,
1031 1031 create_repo_svn_settings=mock.DEFAULT,
1032 1032 create_or_update_repo_hook_settings=mock.DEFAULT,
1033 1033 create_or_update_repo_pr_settings=mock.DEFAULT,
1034 1034 create_or_update_repo_hg_settings=mock.DEFAULT)
General Comments 0
You need to be logged in to leave comments. Login now