# -*- coding: utf-8 -*-
# Copyright (C) 2010-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import json
from mock import patch
import pytest
from pylons import tmpl_context as c
import rhodecode
from rhodecode.lib.utils import map_groups
from rhodecode.model.db import Repository, User, RepoGroup
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.settings import SettingsModel
from rhodecode.tests import TestController, url, TEST_USER_ADMIN_LOGIN
from rhodecode.tests.fixture import Fixture
fixture = Fixture()
class TestHomeController(TestController):
def test_index(self):
self.log_user()
response = self.app.get(url(controller='home', action='index'))
# if global permission is set
response.mustcontain('Add Repository')
# search for objects inside the JavaScript JSON
for repo in Repository.getAll():
response.mustcontain('"name_raw": "%s"' % repo.repo_name)
def test_index_contains_statics_with_ver(self):
self.log_user()
response = self.app.get(url(controller='home', action='index'))
rhodecode_version_hash = c.rhodecode_version_hash
response.mustcontain('style.css?ver={0}'.format(rhodecode_version_hash))
response.mustcontain('rhodecode-components.js?ver={0}'.format(rhodecode_version_hash))
def test_index_contains_backend_specific_details(self, backend):
self.log_user()
response = self.app.get(url(controller='home', action='index'))
tip = backend.repo.get_commit().raw_id
# html in javascript variable:
response.mustcontain(r''
user.lastname = (
'')
Session().add(user)
Session().commit()
response = self.app.get(url(controller='home', action='index'))
response.mustcontain(
'<img src="/image1" onload="'
'alert('Hello, World!');">')
response.mustcontain(
'<img src="/image2" onload="'
'alert('Hello, World!');">')
@pytest.mark.parametrize("name, state", [
('Disabled', False),
('Enabled', True),
])
def test_index_show_version(self, autologin_user, name, state):
version_string = 'RhodeCode Enterprise %s' % rhodecode.__version__
sett = SettingsModel().create_or_update_setting(
'show_version', state, 'bool')
Session().add(sett)
Session().commit()
SettingsModel().invalidate_settings_cache()
response = self.app.get(url(controller='home', action='index'))
if state is True:
response.mustcontain(version_string)
if state is False:
response.mustcontain(no=[version_string])
class TestUserAutocompleteData(TestController):
def test_returns_list_of_users(self, user_util):
self.log_user()
user = user_util.create_user(is_active=True)
user_name = user.username
response = self.app.get(
url(controller='home', action='user_autocomplete_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
result = json.loads(response.body)
values = [suggestion['value'] for suggestion in result['suggestions']]
assert user_name in values
def test_returns_inactive_users_when_active_flag_sent(self, user_util):
self.log_user()
user = user_util.create_user(is_active=False)
user_name = user.username
response = self.app.get(
url(controller='home', action='user_autocomplete_data',
user_groups='true', active='0'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
result = json.loads(response.body)
values = [suggestion['value'] for suggestion in result['suggestions']]
assert user_name in values
def test_returns_groups_when_user_groups_sent(self, user_util):
self.log_user()
group = user_util.create_user_group(user_groups_active=True)
group_name = group.users_group_name
response = self.app.get(
url(controller='home', action='user_autocomplete_data',
user_groups='true'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
result = json.loads(response.body)
values = [suggestion['value'] for suggestion in result['suggestions']]
assert group_name in values
def test_result_is_limited_when_query_is_sent(self):
self.log_user()
fake_result = [
{
'first_name': 'John',
'value_display': 'hello{} (John Smith)'.format(i),
'icon_link': '/images/user14.png',
'value': 'hello{}'.format(i),
'last_name': 'Smith',
'username': 'hello{}'.format(i),
'id': i,
'value_type': u'user'
}
for i in range(10)
]
users_patcher = patch.object(
RepoModel, 'get_users', return_value=fake_result)
groups_patcher = patch.object(
RepoModel, 'get_user_groups', return_value=fake_result)
query = 'hello'
with users_patcher as users_mock, groups_patcher as groups_mock:
response = self.app.get(
url(controller='home', action='user_autocomplete_data',
user_groups='true', query=query),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
result = json.loads(response.body)
users_mock.assert_called_once_with(
name_contains=query, only_active=True)
groups_mock.assert_called_once_with(
name_contains=query, only_active=True)
assert len(result['suggestions']) == 20
def assert_and_get_content(result):
repos = []
groups = []
commits = []
for data in result:
for data_item in data['children']:
assert data_item['id']
assert data_item['text']
assert data_item['url']
if data_item['type'] == 'repo':
repos.append(data_item)
elif data_item['type'] == 'group':
groups.append(data_item)
elif data_item['type'] == 'commit':
commits.append(data_item)
else:
raise Exception('invalid type %s' % data_item['type'])
return repos, groups, commits
class TestGotoSwitcherData(TestController):
required_repos_with_groups = [
'abc',
'abc-fork',
'forks/abcd',
'abcd',
'abcde',
'a/abc',
'aa/abc',
'aaa/abc',
'aaaa/abc',
'repos_abc/aaa/abc',
'abc_repos/abc',
'abc_repos/abcd',
'xxx/xyz',
'forked-abc/a/abc'
]
@pytest.fixture(autouse=True, scope='class')
def prepare(self, request, pylonsapp):
for repo_and_group in self.required_repos_with_groups:
# create structure of groups and return the last group
repo_group = map_groups(repo_and_group)
RepoModel()._create_repo(
repo_and_group, 'hg', 'test-ac', TEST_USER_ADMIN_LOGIN,
repo_group=getattr(repo_group, 'group_id', None))
Session().commit()
request.addfinalizer(self.cleanup)
def cleanup(self):
# first delete all repos
for repo_and_groups in self.required_repos_with_groups:
repo = Repository.get_by_repo_name(repo_and_groups)
if repo:
RepoModel().delete(repo)
Session().commit()
# then delete all empty groups
for repo_and_groups in self.required_repos_with_groups:
if '/' in repo_and_groups:
r_group = repo_and_groups.rsplit('/', 1)[0]
repo_group = RepoGroup.get_by_group_name(r_group)
if not repo_group:
continue
parents = repo_group.parents
RepoGroupModel().delete(repo_group, force_delete=True)
Session().commit()
for el in reversed(parents):
RepoGroupModel().delete(el, force_delete=True)
Session().commit()
def test_returns_list_of_repos_and_groups(self):
self.log_user()
response = self.app.get(
url(controller='home', action='goto_switcher_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
assert len(repos) == len(Repository.get_all())
assert len(groups) == len(RepoGroup.get_all())
assert len(commits) == 0
def test_returns_list_of_repos_and_groups_filtered(self):
self.log_user()
response = self.app.get(
url(controller='home', action='goto_switcher_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
params={'query': 'abc'}, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
assert len(repos) == 13
assert len(groups) == 5
assert len(commits) == 0
def test_returns_list_of_properly_sorted_and_filtered(self):
self.log_user()
response = self.app.get(
url(controller='home', action='goto_switcher_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
params={'query': 'abc'}, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
test_repos = [x['text'] for x in repos[:4]]
assert ['abc', 'abcd', 'a/abc', 'abcde'] == test_repos
test_groups = [x['text'] for x in groups[:4]]
assert ['abc_repos', 'repos_abc',
'forked-abc', 'forked-abc/a'] == test_groups
class TestRepoListData(TestController):
def test_returns_list_of_repos_and_groups(self, user_util):
self.log_user()
response = self.app.get(
url(controller='home', action='repo_list_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
assert len(repos) == len(Repository.get_all())
assert len(groups) == 0
assert len(commits) == 0
def test_returns_list_of_repos_and_groups_filtered(self):
self.log_user()
response = self.app.get(
url(controller='home', action='repo_list_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
params={'query': 'vcs_test_git'}, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
assert len(repos) == len(Repository.query().filter(
Repository.repo_name.ilike('%vcs_test_git%')).all())
assert len(groups) == 0
assert len(commits) == 0
def test_returns_list_of_repos_and_groups_filtered_with_type(self):
self.log_user()
response = self.app.get(
url(controller='home', action='repo_list_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
params={'query': 'vcs_test_git', 'repo_type': 'git'}, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
assert len(repos) == len(Repository.query().filter(
Repository.repo_name.ilike('%vcs_test_git%')).all())
assert len(groups) == 0
assert len(commits) == 0
def test_returns_list_of_repos_non_ascii_query(self):
self.log_user()
response = self.app.get(
url(controller='home', action='repo_list_data'),
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', },
params={'query': 'ć_vcs_test_ą', 'repo_type': 'git'}, status=200)
result = json.loads(response.body)['results']
repos, groups, commits = assert_and_get_content(result)
assert len(repos) == 0
assert len(groups) == 0
assert len(commits) == 0