# -*- coding: utf-8 -*- # Copyright (C) 2010-2016 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import urllib import mock import pytest from rhodecode.lib import auth from rhodecode.lib import vcs from rhodecode.lib.utils2 import safe_str, str2bool from rhodecode.lib.vcs.exceptions import RepositoryRequirementError from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\ Permission from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.settings import SettingsModel, VcsSettingsModel from rhodecode.model.user import UserModel from rhodecode.tests import ( login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO, TESTS_TMP_PATH, logout_user_session) from rhodecode.tests.fixture import Fixture, error_function from rhodecode.tests.utils import AssertResponse fixture = Fixture() @pytest.mark.usefixtures("app") class TestAdminRepos: def test_index(self): self.app.get(url('repos')) def test_create_page_restricted(self, autologin_user, backend): with mock.patch('rhodecode.BACKENDS', {'git': 'git'}): response = self.app.get(url('new_repo'), status=200) assert_response = AssertResponse(response) element = assert_response.get_element('#repo_type') assert element.text_content() == '\ngit\n' def test_create_page_non_restricted(self, autologin_user, backend): response = self.app.get(url('new_repo'), status=200) assert_response = AssertResponse(response) assert_response.element_contains('#repo_type', 'git') assert_response.element_contains('#repo_type', 'svn') assert_response.element_contains('#repo_type', 'hg') @pytest.mark.parametrize("suffix", [u'', u''], ids=['', 'non-ascii']) def test_create(self, autologin_user, backend, suffix, csrf_token): repo_name_unicode = backend.new_repo_name(suffix=suffix) repo_name = repo_name_unicode.encode('utf8') description_unicode = u'description for newly created repo' + suffix description = description_unicode.encode('utf8') self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, csrf_token=csrf_token), status=302 ) self.assert_repository_is_created_correctly( repo_name, description, backend) def test_create_numeric(self, autologin_user, backend, csrf_token): numeric_repo = '1234' repo_name = numeric_repo description = 'description for newly created repo' + numeric_repo self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, csrf_token=csrf_token)) self.assert_repository_is_created_correctly( repo_name, description, backend) @pytest.mark.parametrize("suffix", [u'', u'ąćę'], ids=['', 'non-ascii']) def test_create_in_group( self, autologin_user, backend, suffix, csrf_token): # create GROUP group_name = 'sometest_%s' % backend.alias gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) Session().commit() repo_name = u'ingroup' + suffix repo_name_full = RepoGroup.url_sep().join( [group_name, repo_name]) description = u'description for newly created repo' self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=safe_str(repo_name), repo_type=backend.alias, repo_description=description, repo_group=gr.group_id, csrf_token=csrf_token)) # TODO: johbo: Cleanup work to fixture try: self.assert_repository_is_created_correctly( repo_name_full, description, backend) new_repo = RepoModel().get_by_repo_name(repo_name_full) inherited_perms = UserRepoToPerm.query().filter( UserRepoToPerm.repository_id == new_repo.repo_id).all() assert len(inherited_perms) == 1 finally: RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) Session().commit() def test_create_in_group_numeric( self, autologin_user, backend, csrf_token): # create GROUP group_name = 'sometest_%s' % backend.alias gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) Session().commit() repo_name = '12345' repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) description = 'description for newly created repo' self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, repo_group=gr.group_id, csrf_token=csrf_token)) # TODO: johbo: Cleanup work to fixture try: self.assert_repository_is_created_correctly( repo_name_full, description, backend) new_repo = RepoModel().get_by_repo_name(repo_name_full) inherited_perms = UserRepoToPerm.query()\ .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all() assert len(inherited_perms) == 1 finally: RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) Session().commit() def test_create_in_group_without_needed_permissions(self, backend): session = login_user_session( self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) csrf_token = auth.get_csrf_token(session) # revoke user_model = UserModel() # disable fork and create on default user user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') # disable on regular user user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') Session().commit() # create GROUP group_name = 'reg_sometest_%s' % backend.alias gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) Session().commit() group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias gr_allowed = RepoGroupModel().create( group_name=group_name_allowed, group_description='test', owner=TEST_USER_REGULAR_LOGIN) Session().commit() repo_name = 'ingroup' description = 'description for newly created repo' response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, repo_group=gr.group_id, csrf_token=csrf_token)) response.mustcontain('Invalid value') # user is allowed to create in this group repo_name = 'ingroup' repo_name_full = RepoGroup.url_sep().join( [group_name_allowed, repo_name]) description = 'description for newly created repo' response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, repo_group=gr_allowed.group_id, csrf_token=csrf_token)) # TODO: johbo: Cleanup in pytest fixture try: self.assert_repository_is_created_correctly( repo_name_full, description, backend) new_repo = RepoModel().get_by_repo_name(repo_name_full) inherited_perms = UserRepoToPerm.query().filter( UserRepoToPerm.repository_id == new_repo.repo_id).all() assert len(inherited_perms) == 1 assert repo_on_filesystem(repo_name_full) finally: RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) RepoGroupModel().delete(group_name_allowed) Session().commit() def test_create_in_group_inherit_permissions(self, autologin_user, backend, csrf_token): # create GROUP group_name = 'sometest_%s' % backend.alias gr = RepoGroupModel().create(group_name=group_name, group_description='test', owner=TEST_USER_ADMIN_LOGIN) perm = Permission.get_by_key('repository.write') RepoGroupModel().grant_user_permission( gr, TEST_USER_REGULAR_LOGIN, perm) # add repo permissions Session().commit() repo_name = 'ingroup_inherited_%s' % backend.alias repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) description = 'description for newly created repo' self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, repo_group=gr.group_id, repo_copy_permissions=True, csrf_token=csrf_token)) # TODO: johbo: Cleanup to pytest fixture try: self.assert_repository_is_created_correctly( repo_name_full, description, backend) except Exception: RepoGroupModel().delete(group_name) Session().commit() raise # check if inherited permissions are applied new_repo = RepoModel().get_by_repo_name(repo_name_full) inherited_perms = UserRepoToPerm.query().filter( UserRepoToPerm.repository_id == new_repo.repo_id).all() assert len(inherited_perms) == 2 assert TEST_USER_REGULAR_LOGIN in [ x.user.username for x in inherited_perms] assert 'repository.write' in [ x.permission.permission_name for x in inherited_perms] RepoModel().delete(repo_name_full) RepoGroupModel().delete(group_name) Session().commit() @pytest.mark.xfail_backends( "git", "hg", reason="Missing reposerver support") def test_create_with_clone_uri(self, autologin_user, backend, reposerver, csrf_token): source_repo = backend.create_repo(number_of_commits=2) source_repo_name = source_repo.repo_name reposerver.serve(source_repo.scm_instance()) repo_name = backend.new_repo_name() response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description='', clone_uri=reposerver.url, csrf_token=csrf_token), status=302) # Should be redirected to the creating page response.mustcontain('repo_creating') # Expecting that both repositories have same history source_repo = RepoModel().get_by_repo_name(source_repo_name) source_vcs = source_repo.scm_instance() repo = RepoModel().get_by_repo_name(repo_name) repo_vcs = repo.scm_instance() assert source_vcs[0].message == repo_vcs[0].message assert source_vcs.count() == repo_vcs.count() assert source_vcs.commit_ids == repo_vcs.commit_ids @pytest.mark.xfail_backends("svn", reason="Depends on import support") def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend, csrf_token): repo_name = backend.new_repo_name() description = 'description for newly created repo' response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, clone_uri='http://repo.invalid/repo', csrf_token=csrf_token)) response.mustcontain('invalid clone url') @pytest.mark.xfail_backends("svn", reason="Depends on import support") def test_create_remote_repo_wrong_clone_uri_hg_svn( self, autologin_user, backend, csrf_token): repo_name = backend.new_repo_name() description = 'description for newly created repo' response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, clone_uri='svn+http://svn.invalid/repo', csrf_token=csrf_token)) response.mustcontain('invalid clone url') @pytest.mark.parametrize("suffix", [u'', u'ąęł'], ids=['', 'non-ascii']) def test_delete(self, autologin_user, backend, suffix, csrf_token): repo = backend.create_repo(name_suffix=suffix) repo_name = repo.repo_name response = self.app.post(url('repo', repo_name=repo_name), params={'_method': 'delete', 'csrf_token': csrf_token}) assert_session_flash(response, 'Deleted repository %s' % (repo_name)) response.follow() # check if repo was deleted from db assert RepoModel().get_by_repo_name(repo_name) is None assert not repo_on_filesystem(repo_name) def test_show(self, autologin_user, backend): self.app.get(url('repo', repo_name=backend.repo_name)) def test_edit(self, backend, autologin_user): self.app.get(url('edit_repo', repo_name=backend.repo_name)) def test_edit_accessible_when_missing_requirements( self, backend_hg, autologin_user): scm_patcher = mock.patch.object( Repository, 'scm_instance', side_effect=RepositoryRequirementError) with scm_patcher: self.app.get(url('edit_repo', repo_name=backend_hg.repo_name)) def test_set_private_flag_sets_default_to_none( self, autologin_user, backend, csrf_token): # initially repository perm should be read perm = _get_permission_for_user(user='default', repo=backend.repo_name) assert len(perm) == 1 assert perm[0].permission.permission_name == 'repository.read' assert not backend.repo.private response = self.app.post( url('repo', repo_name=backend.repo_name), fixture._get_repo_create_params( repo_private=1, repo_name=backend.repo_name, repo_type=backend.alias, user=TEST_USER_ADMIN_LOGIN, _method='put', csrf_token=csrf_token)) assert_session_flash( response, msg='Repository %s updated successfully' % (backend.repo_name)) assert backend.repo.private # now the repo default permission should be None perm = _get_permission_for_user(user='default', repo=backend.repo_name) assert len(perm) == 1 assert perm[0].permission.permission_name == 'repository.none' response = self.app.post( url('repo', repo_name=backend.repo_name), fixture._get_repo_create_params( repo_private=False, repo_name=backend.repo_name, repo_type=backend.alias, user=TEST_USER_ADMIN_LOGIN, _method='put', csrf_token=csrf_token)) assert_session_flash( response, msg='Repository %s updated successfully' % (backend.repo_name)) assert not backend.repo.private # we turn off private now the repo default permission should stay None perm = _get_permission_for_user(user='default', repo=backend.repo_name) assert len(perm) == 1 assert perm[0].permission.permission_name == 'repository.none' # update this permission back perm[0].permission = Permission.get_by_key('repository.read') Session().add(perm[0]) Session().commit() def test_default_user_cannot_access_private_repo_in_a_group( self, autologin_user, user_util, backend, csrf_token): group = user_util.create_repo_group() repo = backend.create_repo( repo_private=True, repo_group=group, repo_copy_permissions=True) permissions = _get_permission_for_user( user='default', repo=repo.repo_name) assert len(permissions) == 1 assert permissions[0].permission.permission_name == 'repository.none' assert permissions[0].repository.private is True def test_set_repo_fork_has_no_self_id(self, autologin_user, backend): repo = backend.repo response = self.app.get( url('edit_repo_advanced', repo_name=backend.repo_name)) opt = """""" % repo.repo_id response.mustcontain(no=[opt]) def test_set_fork_of_target_repo( self, autologin_user, backend, csrf_token): target_repo = 'target_%s' % backend.alias fixture.create_repo(target_repo, repo_type=backend.alias) repo2 = Repository.get_by_repo_name(target_repo) response = self.app.post( url('edit_repo_advanced_fork', repo_name=backend.repo_name), params={'id_fork_of': repo2.repo_id, '_method': 'put', 'csrf_token': csrf_token}) repo = Repository.get_by_repo_name(backend.repo_name) repo2 = Repository.get_by_repo_name(target_repo) assert_session_flash( response, 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name)) assert repo.fork == repo2 response = response.follow() # check if given repo is selected opt = 'This repository is a fork of %s' % ( url('summary_home', repo_name=repo2.repo_name), repo2.repo_name) response.mustcontain(opt) fixture.destroy_repo(target_repo, forks='detach') @pytest.mark.backends("hg", "git") def test_set_fork_of_other_type_repo(self, autologin_user, backend, csrf_token): TARGET_REPO_MAP = { 'git': { 'type': 'hg', 'repo_name': HG_REPO}, 'hg': { 'type': 'git', 'repo_name': GIT_REPO}, } target_repo = TARGET_REPO_MAP[backend.alias] repo2 = Repository.get_by_repo_name(target_repo['repo_name']) response = self.app.post( url('edit_repo_advanced_fork', repo_name=backend.repo_name), params={'id_fork_of': repo2.repo_id, '_method': 'put', 'csrf_token': csrf_token}) assert_session_flash( response, 'Cannot set repository as fork of repository with other type') def test_set_fork_of_none(self, autologin_user, backend, csrf_token): # mark it as None response = self.app.post( url('edit_repo_advanced_fork', repo_name=backend.repo_name), params={'id_fork_of': None, '_method': 'put', 'csrf_token': csrf_token}) assert_session_flash( response, 'Marked repo %s as fork of %s' % (backend.repo_name, "Nothing")) assert backend.repo.fork is None def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token): repo = Repository.get_by_repo_name(backend.repo_name) response = self.app.post( url('edit_repo_advanced_fork', repo_name=backend.repo_name), params={'id_fork_of': repo.repo_id, '_method': 'put', 'csrf_token': csrf_token}) assert_session_flash( response, 'An error occurred during this operation') def test_create_on_top_level_without_permissions(self, backend): session = login_user_session( self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) csrf_token = auth.get_csrf_token(session) # revoke user_model = UserModel() # disable fork and create on default user user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') # disable on regular user user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') Session().commit() repo_name = backend.new_repo_name() description = 'description for newly created repo' response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, csrf_token=csrf_token)) response.mustcontain( u"You do not have the permission to store repositories in " u"the root location.") @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function) def test_create_repo_when_filesystem_op_fails( self, autologin_user, backend, csrf_token): repo_name = backend.new_repo_name() description = 'description for newly created repo' response = self.app.post( url('repos'), fixture._get_repo_create_params( repo_private=False, repo_name=repo_name, repo_type=backend.alias, repo_description=description, csrf_token=csrf_token)) assert_session_flash( response, 'Error creating repository %s' % repo_name) # repo must not be in db assert backend.repo is None # repo must not be in filesystem ! assert not repo_on_filesystem(repo_name) def assert_repository_is_created_correctly( self, repo_name, description, backend): repo_name_utf8 = repo_name.encode('utf-8') # run the check page that triggers the flash message response = self.app.get(url('repo_check_home', repo_name=repo_name)) assert response.json == {u'result': True} assert_session_flash( response, u'Created repository %s' % (urllib.quote(repo_name_utf8), repo_name)) # test if the repo was created in the database new_repo = RepoModel().get_by_repo_name(repo_name) assert new_repo.repo_name == repo_name assert new_repo.description == description # test if the repository is visible in the list ? response = self.app.get(url('summary_home', repo_name=repo_name)) response.mustcontain(repo_name) response.mustcontain(backend.alias) assert repo_on_filesystem(repo_name) @pytest.mark.usefixtures("app") class TestVcsSettings(object): FORM_DATA = { 'inherit_global_settings': False, 'hooks_changegroup_repo_size': False, 'hooks_changegroup_push_logger': False, 'hooks_outgoing_pull_logger': False, 'extensions_largefiles': False, 'phases_publish': 'false', 'rhodecode_pr_merge_enabled': False, 'rhodecode_use_outdated_comments': False, 'new_svn_branch': '', 'new_svn_tag': '' } @pytest.mark.skip_backends('svn') def test_global_settings_initial_values(self, autologin_user, backend): repo_name = backend.repo_name response = self.app.get(url('repo_vcs_settings', repo_name=repo_name)) expected_settings = ( 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled', 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger', 'hooks_outgoing_pull_logger' ) for setting in expected_settings: self.assert_repo_value_equals_global_value(response, setting) def test_show_settings_requires_repo_admin_permission( self, backend, user_util, settings_util): repo = backend.create_repo() repo_name = repo.repo_name user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN) user_util.grant_user_permission_to_repo(repo, user, 'repository.admin') login_user_session( self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200) def test_inherit_global_settings_flag_is_true_by_default( self, autologin_user, backend): repo_name = backend.repo_name response = self.app.get(url('repo_vcs_settings', repo_name=repo_name)) assert_response = AssertResponse(response) element = assert_response.get_element('#inherit_global_settings') assert element.checked @pytest.mark.parametrize('checked_value', [True, False]) def test_inherit_global_settings_value( self, autologin_user, backend, checked_value, settings_util): repo = backend.create_repo() repo_name = repo.repo_name settings_util.create_repo_rhodecode_setting( repo, 'inherit_vcs_settings', checked_value, 'bool') response = self.app.get(url('repo_vcs_settings', repo_name=repo_name)) assert_response = AssertResponse(response) element = assert_response.get_element('#inherit_global_settings') assert element.checked == checked_value @pytest.mark.skip_backends('svn') def test_hooks_settings_are_created( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: for section, key in VcsSettingsModel.HOOKS_SETTINGS: ui = settings.get_ui_by_section_and_key(section, key) assert ui.ui_active is False finally: self._cleanup_repo_settings(settings) def test_hooks_settings_are_not_created_for_svn( self, autologin_user, backend_svn, csrf_token): repo_name = backend_svn.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: for section, key in VcsSettingsModel.HOOKS_SETTINGS: ui = settings.get_ui_by_section_and_key(section, key) assert ui is None finally: self._cleanup_repo_settings(settings) @pytest.mark.skip_backends('svn') def test_hooks_settings_are_updated( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name settings = SettingsModel(repo=repo_name) for section, key in VcsSettingsModel.HOOKS_SETTINGS: settings.create_ui_section_value(section, '', key=key, active=True) data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: for section, key in VcsSettingsModel.HOOKS_SETTINGS: ui = settings.get_ui_by_section_and_key(section, key) assert ui.ui_active is False finally: self._cleanup_repo_settings(settings) def test_hooks_settings_are_not_updated_for_svn( self, autologin_user, backend_svn, csrf_token): repo_name = backend_svn.repo_name settings = SettingsModel(repo=repo_name) for section, key in VcsSettingsModel.HOOKS_SETTINGS: settings.create_ui_section_value(section, '', key=key, active=True) data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: for section, key in VcsSettingsModel.HOOKS_SETTINGS: ui = settings.get_ui_by_section_and_key(section, key) assert ui.ui_active is True finally: self._cleanup_repo_settings(settings) @pytest.mark.skip_backends('svn') def test_pr_settings_are_created( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: for name in VcsSettingsModel.GENERAL_SETTINGS: setting = settings.get_setting_by_name(name) assert setting.app_settings_value is False finally: self._cleanup_repo_settings(settings) def test_pr_settings_are_not_created_for_svn( self, autologin_user, backend_svn, csrf_token): repo_name = backend_svn.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: for name in VcsSettingsModel.GENERAL_SETTINGS: setting = settings.get_setting_by_name(name) assert setting is None finally: self._cleanup_repo_settings(settings) def test_pr_settings_creation_requires_repo_admin_permission( self, backend, user_util, settings_util, csrf_token): repo = backend.create_repo() repo_name = repo.repo_name user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN) logout_user_session(self.app, csrf_token) session = login_user_session( self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) new_csrf_token = auth.get_csrf_token(session) user_util.grant_user_permission_to_repo(repo, user, 'repository.admin') data = self.FORM_DATA.copy() data['csrf_token'] = new_csrf_token settings = SettingsModel(repo=repo_name) try: self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) finally: self._cleanup_repo_settings(settings) @pytest.mark.skip_backends('svn') def test_pr_settings_are_updated( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name settings = SettingsModel(repo=repo_name) for name in VcsSettingsModel.GENERAL_SETTINGS: settings.create_or_update_setting(name, True, 'bool') data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: for name in VcsSettingsModel.GENERAL_SETTINGS: setting = settings.get_setting_by_name(name) assert setting.app_settings_value is False finally: self._cleanup_repo_settings(settings) def test_pr_settings_are_not_updated_for_svn( self, autologin_user, backend_svn, csrf_token): repo_name = backend_svn.repo_name settings = SettingsModel(repo=repo_name) for name in VcsSettingsModel.GENERAL_SETTINGS: settings.create_or_update_setting(name, True, 'bool') data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: for name in VcsSettingsModel.GENERAL_SETTINGS: setting = settings.get_setting_by_name(name) assert setting.app_settings_value is True finally: self._cleanup_repo_settings(settings) def test_svn_settings_are_created( self, autologin_user, backend_svn, csrf_token, settings_util): repo_name = backend_svn.repo_name data = self.FORM_DATA.copy() data['new_svn_tag'] = 'svn-tag' data['new_svn_branch'] = 'svn-branch' data['csrf_token'] = csrf_token # Create few global settings to make sure that uniqueness validators # are not triggered settings_util.create_rhodecode_ui( VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch') settings_util.create_rhodecode_ui( VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag') self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: svn_branches = settings.get_ui_by_section( VcsSettingsModel.SVN_BRANCH_SECTION) svn_branch_names = [b.ui_value for b in svn_branches] svn_tags = settings.get_ui_by_section( VcsSettingsModel.SVN_TAG_SECTION) svn_tag_names = [b.ui_value for b in svn_tags] assert 'svn-branch' in svn_branch_names assert 'svn-tag' in svn_tag_names finally: self._cleanup_repo_settings(settings) def test_svn_settings_are_unique( self, autologin_user, backend_svn, csrf_token, settings_util): repo = backend_svn.repo repo_name = repo.repo_name data = self.FORM_DATA.copy() data['new_svn_tag'] = 'test_tag' data['new_svn_branch'] = 'test_branch' data['csrf_token'] = csrf_token settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch') settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag') response = self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=200) response.mustcontain('Pattern already exists') def test_svn_settings_with_empty_values_are_not_created( self, autologin_user, backend_svn, csrf_token): repo_name = backend_svn.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: svn_branches = settings.get_ui_by_section( VcsSettingsModel.SVN_BRANCH_SECTION) svn_tags = settings.get_ui_by_section( VcsSettingsModel.SVN_TAG_SECTION) assert len(svn_branches) == 0 assert len(svn_tags) == 0 finally: self._cleanup_repo_settings(settings) def test_svn_settings_are_shown_for_svn_repository( self, autologin_user, backend_svn, csrf_token): repo_name = backend_svn.repo_name response = self.app.get( url('repo_vcs_settings', repo_name=repo_name), status=200) response.mustcontain('Subversion Settings') @pytest.mark.skip_backends('svn') def test_svn_settings_are_not_created_for_not_svn_repository( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: svn_branches = settings.get_ui_by_section( VcsSettingsModel.SVN_BRANCH_SECTION) svn_tags = settings.get_ui_by_section( VcsSettingsModel.SVN_TAG_SECTION) assert len(svn_branches) == 0 assert len(svn_tags) == 0 finally: self._cleanup_repo_settings(settings) @pytest.mark.skip_backends('svn') def test_svn_settings_are_shown_only_for_svn_repository( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name response = self.app.get( url('repo_vcs_settings', repo_name=repo_name), status=200) response.mustcontain(no='Subversion Settings') def test_hg_settings_are_created( self, autologin_user, backend_hg, csrf_token): repo_name = backend_hg.repo_name data = self.FORM_DATA.copy() data['new_svn_tag'] = 'svn-tag' data['new_svn_branch'] = 'svn-branch' data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: largefiles_ui = settings.get_ui_by_section_and_key( 'extensions', 'largefiles') assert largefiles_ui.ui_active is False phases_ui = settings.get_ui_by_section_and_key( 'phases', 'publish') assert str2bool(phases_ui.ui_value) is False finally: self._cleanup_repo_settings(settings) def test_hg_settings_are_updated( self, autologin_user, backend_hg, csrf_token): repo_name = backend_hg.repo_name settings = SettingsModel(repo=repo_name) settings.create_ui_section_value( 'extensions', '', key='largefiles', active=True) settings.create_ui_section_value( 'phases', '1', key='publish', active=True) data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: largefiles_ui = settings.get_ui_by_section_and_key( 'extensions', 'largefiles') assert largefiles_ui.ui_active is False phases_ui = settings.get_ui_by_section_and_key( 'phases', 'publish') assert str2bool(phases_ui.ui_value) is False finally: self._cleanup_repo_settings(settings) def test_hg_settings_are_shown_for_hg_repository( self, autologin_user, backend_hg, csrf_token): repo_name = backend_hg.repo_name response = self.app.get( url('repo_vcs_settings', repo_name=repo_name), status=200) response.mustcontain('Mercurial Settings') @pytest.mark.skip_backends('hg') def test_hg_settings_are_created_only_for_hg_repository( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) try: largefiles_ui = settings.get_ui_by_section_and_key( 'extensions', 'largefiles') assert largefiles_ui is None phases_ui = settings.get_ui_by_section_and_key( 'phases', 'publish') assert phases_ui is None finally: self._cleanup_repo_settings(settings) @pytest.mark.skip_backends('hg') def test_hg_settings_are_shown_only_for_hg_repository( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name response = self.app.get( url('repo_vcs_settings', repo_name=repo_name), status=200) response.mustcontain(no='Mercurial Settings') @pytest.mark.skip_backends('hg') def test_hg_settings_are_updated_only_for_hg_repository( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name settings = SettingsModel(repo=repo_name) settings.create_ui_section_value( 'extensions', '', key='largefiles', active=True) settings.create_ui_section_value( 'phases', '1', key='publish', active=True) data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: largefiles_ui = settings.get_ui_by_section_and_key( 'extensions', 'largefiles') assert largefiles_ui.ui_active is True phases_ui = settings.get_ui_by_section_and_key( 'phases', 'publish') assert phases_ui.ui_value == '1' finally: self._cleanup_repo_settings(settings) def test_per_repo_svn_settings_are_displayed( self, autologin_user, backend_svn, settings_util): repo = backend_svn.create_repo() repo_name = repo.repo_name branches = [ settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'branch_{}'.format(i)) for i in range(10)] tags = [ settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i)) for i in range(10)] response = self.app.get( url('repo_vcs_settings', repo_name=repo_name), status=200) assert_response = AssertResponse(response) for branch in branches: css_selector = '[name=branch_value_{}]'.format(branch.ui_id) element = assert_response.get_element(css_selector) assert element.value == branch.ui_value for tag in tags: css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id) element = assert_response.get_element(css_selector) assert element.value == tag.ui_value def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn( self, autologin_user, backend_svn, settings_util): repo = backend_svn.create_repo() repo_name = repo.repo_name response = self.app.get( url('repo_vcs_settings', repo_name=repo_name), status=200) response.mustcontain(no='') response.mustcontain(no='') def test_inherit_global_settings_value_is_saved( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token data['inherit_global_settings'] = True self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) vcs_settings = VcsSettingsModel(repo=repo_name) try: assert vcs_settings.inherit_global_settings is True finally: self._cleanup_repo_settings(settings) def test_repo_cache_is_invalidated_when_settings_are_updated( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token data['inherit_global_settings'] = True settings = SettingsModel(repo=repo_name) invalidation_patcher = mock.patch( 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation') with invalidation_patcher as invalidation_mock: self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) try: invalidation_mock.assert_called_once_with(repo_name, delete=True) finally: self._cleanup_repo_settings(settings) def test_other_settings_not_saved_inherit_global_settings_is_true( self, autologin_user, backend, csrf_token): repo_name = backend.repo_name data = self.FORM_DATA.copy() data['csrf_token'] = csrf_token data['inherit_global_settings'] = True self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, status=302) settings = SettingsModel(repo=repo_name) ui_settings = ( VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS) vcs_settings = [] try: for section, key in ui_settings: ui = settings.get_ui_by_section_and_key(section, key) if ui: vcs_settings.append(ui) vcs_settings.extend(settings.get_ui_by_section( VcsSettingsModel.SVN_BRANCH_SECTION)) vcs_settings.extend(settings.get_ui_by_section( VcsSettingsModel.SVN_TAG_SECTION)) for name in VcsSettingsModel.GENERAL_SETTINGS: setting = settings.get_setting_by_name(name) if setting: vcs_settings.append(setting) assert vcs_settings == [] finally: self._cleanup_repo_settings(settings) def test_delete_svn_branch_and_tag_patterns( self, autologin_user, backend_svn, settings_util, csrf_token): repo = backend_svn.create_repo() repo_name = repo.repo_name branch = settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch', cleanup=False) tag = settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False) data = { '_method': 'delete', 'csrf_token': csrf_token } for id_ in (branch.ui_id, tag.ui_id): data['delete_svn_pattern'] = id_, self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) settings = VcsSettingsModel(repo=repo_name) assert settings.get_repo_svn_branch_patterns() == [] def test_delete_svn_branch_requires_repo_admin_permission( self, backend_svn, user_util, settings_util, csrf_token): repo = backend_svn.create_repo() repo_name = repo.repo_name user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN) logout_user_session(self.app, csrf_token) session = login_user_session( self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) csrf_token = auth.get_csrf_token(session) user_util.grant_user_permission_to_repo(repo, user, 'repository.admin') branch = settings_util.create_repo_rhodecode_ui( repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch', cleanup=False) data = { '_method': 'delete', 'csrf_token': csrf_token, 'delete_svn_pattern': branch.ui_id } self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) def test_delete_svn_branch_raises_400_when_not_found( self, autologin_user, backend_svn, settings_util, csrf_token): repo_name = backend_svn.repo_name data = { '_method': 'delete', 'delete_svn_pattern': 123, 'csrf_token': csrf_token } self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400) def test_delete_svn_branch_raises_400_when_no_id_specified( self, autologin_user, backend_svn, settings_util, csrf_token): repo_name = backend_svn.repo_name data = { '_method': 'delete', 'csrf_token': csrf_token } self.app.post( url('repo_vcs_settings', repo_name=repo_name), data, headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400) def _cleanup_repo_settings(self, settings_model): cleanup = [] ui_settings = ( VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS) for section, key in ui_settings: ui = settings_model.get_ui_by_section_and_key(section, key) if ui: cleanup.append(ui) cleanup.extend(settings_model.get_ui_by_section( VcsSettingsModel.INHERIT_SETTINGS)) cleanup.extend(settings_model.get_ui_by_section( VcsSettingsModel.SVN_BRANCH_SECTION)) cleanup.extend(settings_model.get_ui_by_section( VcsSettingsModel.SVN_TAG_SECTION)) for name in VcsSettingsModel.GENERAL_SETTINGS: setting = settings_model.get_setting_by_name(name) if setting: cleanup.append(setting) for object_ in cleanup: Session().delete(object_) Session().commit() def assert_repo_value_equals_global_value(self, response, setting): assert_response = AssertResponse(response) global_css_selector = '[name={}_inherited]'.format(setting) repo_css_selector = '[name={}]'.format(setting) repo_element = assert_response.get_element(repo_css_selector) global_element = assert_response.get_element(global_css_selector) assert repo_element.value == global_element.value def repo_on_filesystem(repo_name): try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) return True except Exception: return False def _get_permission_for_user(user, repo): perm = UserRepoToPerm.query()\ .filter(UserRepoToPerm.repository == Repository.get_by_repo_name(repo))\ .filter(UserRepoToPerm.user == User.get_by_username(user))\ .all() return perm