# -*- coding: utf-8 -*- # Copyright (C) 2010-2017 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import json from mock import patch import pytest from pylons import tmpl_context as c import rhodecode from rhodecode.lib.utils import map_groups from rhodecode.model.db import Repository, User, RepoGroup from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.settings import SettingsModel from rhodecode.tests import TestController, url, TEST_USER_ADMIN_LOGIN from rhodecode.tests.fixture import Fixture fixture = Fixture() class TestHomeController(TestController): def test_index(self): self.log_user() response = self.app.get(url(controller='home', action='index')) # if global permission is set response.mustcontain('Add Repository') # search for objects inside the JavaScript JSON for repo in Repository.getAll(): response.mustcontain('"name_raw": "%s"' % repo.repo_name) def test_index_contains_statics_with_ver(self): self.log_user() response = self.app.get(url(controller='home', action='index')) rhodecode_version_hash = c.rhodecode_version_hash response.mustcontain('style.css?ver={0}'.format(rhodecode_version_hash)) response.mustcontain('rhodecode-components.js?ver={0}'.format(rhodecode_version_hash)) def test_index_contains_backend_specific_details(self, backend): self.log_user() response = self.app.get(url(controller='home', action='index')) tip = backend.repo.get_commit().raw_id # html in javascript variable: response.mustcontain(r'<i class=\"icon-%s\"' % (backend.alias, )) response.mustcontain(r'href=\"/%s\"' % (backend.repo_name, )) response.mustcontain("""/%s/changeset/%s""" % (backend.repo_name, tip)) response.mustcontain("""Added a symlink""") def test_index_with_anonymous_access_disabled(self): with fixture.anon_access(False): response = self.app.get(url(controller='home', action='index'), status=302) assert 'login' in response.location def test_index_page_on_groups(self, autologin_user, repo_group): response = self.app.get(url('repo_group_home', group_name='gr1')) response.mustcontain("gr1/repo_in_group") def test_index_page_on_group_with_trailing_slash( self, autologin_user, repo_group): response = self.app.get(url('repo_group_home', group_name='gr1') + '/') response.mustcontain("gr1/repo_in_group") @pytest.fixture(scope='class') def repo_group(self, request): gr = fixture.create_repo_group('gr1') fixture.create_repo(name='gr1/repo_in_group', repo_group=gr) @request.addfinalizer def cleanup(): RepoModel().delete('gr1/repo_in_group') RepoGroupModel().delete(repo_group='gr1', force_delete=True) Session().commit() def test_index_with_name_with_tags(self, autologin_user): user = User.get_by_username('test_admin') user.name = '<img src="/image1" onload="alert(\'Hello, World!\');">' user.lastname = ( '<img src="/image2" onload="alert(\'Hello, World!\');">') Session().add(user) Session().commit() response = self.app.get(url(controller='home', action='index')) response.mustcontain( '<img src="/image1" onload="' 'alert('Hello, World!');">') response.mustcontain( '<img src="/image2" onload="' 'alert('Hello, World!');">') @pytest.mark.parametrize("name, state", [ ('Disabled', False), ('Enabled', True), ]) def test_index_show_version(self, autologin_user, name, state): version_string = 'RhodeCode Enterprise %s' % rhodecode.__version__ sett = SettingsModel().create_or_update_setting( 'show_version', state, 'bool') Session().add(sett) Session().commit() SettingsModel().invalidate_settings_cache() response = self.app.get(url(controller='home', action='index')) if state is True: response.mustcontain(version_string) if state is False: response.mustcontain(no=[version_string]) class TestUserAutocompleteData(TestController): def test_returns_list_of_users(self, user_util): self.log_user() user = user_util.create_user(is_active=True) user_name = user.username response = self.app.get( url(controller='home', action='user_autocomplete_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) result = json.loads(response.body) values = [suggestion['value'] for suggestion in result['suggestions']] assert user_name in values def test_returns_inactive_users_when_active_flag_sent(self, user_util): self.log_user() user = user_util.create_user(is_active=False) user_name = user.username response = self.app.get( url(controller='home', action='user_autocomplete_data', user_groups='true', active='0'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) result = json.loads(response.body) values = [suggestion['value'] for suggestion in result['suggestions']] assert user_name in values def test_returns_groups_when_user_groups_sent(self, user_util): self.log_user() group = user_util.create_user_group(user_groups_active=True) group_name = group.users_group_name response = self.app.get( url(controller='home', action='user_autocomplete_data', user_groups='true'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) result = json.loads(response.body) values = [suggestion['value'] for suggestion in result['suggestions']] assert group_name in values def test_result_is_limited_when_query_is_sent(self): self.log_user() fake_result = [ { 'first_name': 'John', 'value_display': 'hello{} (John Smith)'.format(i), 'icon_link': '/images/user14.png', 'value': 'hello{}'.format(i), 'last_name': 'Smith', 'username': 'hello{}'.format(i), 'id': i, 'value_type': u'user' } for i in range(10) ] users_patcher = patch.object( RepoModel, 'get_users', return_value=fake_result) groups_patcher = patch.object( RepoModel, 'get_user_groups', return_value=fake_result) query = 'hello' with users_patcher as users_mock, groups_patcher as groups_mock: response = self.app.get( url(controller='home', action='user_autocomplete_data', user_groups='true', query=query), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) result = json.loads(response.body) users_mock.assert_called_once_with( name_contains=query, only_active=True) groups_mock.assert_called_once_with( name_contains=query, only_active=True) assert len(result['suggestions']) == 20 def assert_and_get_content(result): repos = [] groups = [] commits = [] for data in result: for data_item in data['children']: assert data_item['id'] assert data_item['text'] assert data_item['url'] if data_item['type'] == 'repo': repos.append(data_item) elif data_item['type'] == 'group': groups.append(data_item) elif data_item['type'] == 'commit': commits.append(data_item) else: raise Exception('invalid type %s' % data_item['type']) return repos, groups, commits class TestGotoSwitcherData(TestController): required_repos_with_groups = [ 'abc', 'abc-fork', 'forks/abcd', 'abcd', 'abcde', 'a/abc', 'aa/abc', 'aaa/abc', 'aaaa/abc', 'repos_abc/aaa/abc', 'abc_repos/abc', 'abc_repos/abcd', 'xxx/xyz', 'forked-abc/a/abc' ] @pytest.fixture(autouse=True, scope='class') def prepare(self, request, pylonsapp): for repo_and_group in self.required_repos_with_groups: # create structure of groups and return the last group repo_group = map_groups(repo_and_group) RepoModel()._create_repo( repo_and_group, 'hg', 'test-ac', TEST_USER_ADMIN_LOGIN, repo_group=getattr(repo_group, 'group_id', None)) Session().commit() request.addfinalizer(self.cleanup) def cleanup(self): # first delete all repos for repo_and_groups in self.required_repos_with_groups: repo = Repository.get_by_repo_name(repo_and_groups) if repo: RepoModel().delete(repo) Session().commit() # then delete all empty groups for repo_and_groups in self.required_repos_with_groups: if '/' in repo_and_groups: r_group = repo_and_groups.rsplit('/', 1)[0] repo_group = RepoGroup.get_by_group_name(r_group) if not repo_group: continue parents = repo_group.parents RepoGroupModel().delete(repo_group, force_delete=True) Session().commit() for el in reversed(parents): RepoGroupModel().delete(el, force_delete=True) Session().commit() def test_returns_list_of_repos_and_groups(self): self.log_user() response = self.app.get( url(controller='home', action='goto_switcher_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) assert len(repos) == len(Repository.get_all()) assert len(groups) == len(RepoGroup.get_all()) assert len(commits) == 0 def test_returns_list_of_repos_and_groups_filtered(self): self.log_user() response = self.app.get( url(controller='home', action='goto_switcher_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, params={'query': 'abc'}, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) assert len(repos) == 13 assert len(groups) == 5 assert len(commits) == 0 def test_returns_list_of_properly_sorted_and_filtered(self): self.log_user() response = self.app.get( url(controller='home', action='goto_switcher_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, params={'query': 'abc'}, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) test_repos = [x['text'] for x in repos[:4]] assert ['abc', 'abcd', 'a/abc', 'abcde'] == test_repos test_groups = [x['text'] for x in groups[:4]] assert ['abc_repos', 'repos_abc', 'forked-abc', 'forked-abc/a'] == test_groups class TestRepoListData(TestController): def test_returns_list_of_repos_and_groups(self, user_util): self.log_user() response = self.app.get( url(controller='home', action='repo_list_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) assert len(repos) == len(Repository.get_all()) assert len(groups) == 0 assert len(commits) == 0 def test_returns_list_of_repos_and_groups_filtered(self): self.log_user() response = self.app.get( url(controller='home', action='repo_list_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, params={'query': 'vcs_test_git'}, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) assert len(repos) == len(Repository.query().filter( Repository.repo_name.ilike('%vcs_test_git%')).all()) assert len(groups) == 0 assert len(commits) == 0 def test_returns_list_of_repos_and_groups_filtered_with_type(self): self.log_user() response = self.app.get( url(controller='home', action='repo_list_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, params={'query': 'vcs_test_git', 'repo_type': 'git'}, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) assert len(repos) == len(Repository.query().filter( Repository.repo_name.ilike('%vcs_test_git%')).all()) assert len(groups) == 0 assert len(commits) == 0 def test_returns_list_of_repos_non_ascii_query(self): self.log_user() response = self.app.get( url(controller='home', action='repo_list_data'), headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, params={'query': 'ć_vcs_test_ą', 'repo_type': 'git'}, status=200) result = json.loads(response.body)['results'] repos, groups, commits = assert_and_get_content(result) assert len(repos) == 0 assert len(groups) == 0 assert len(commits) == 0