# -*- coding: utf-8 -*- # Copyright (C) 2010-2017 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import tempfile import mock import pytest from rhodecode.lib.exceptions import AttachedForksError from rhodecode.lib.utils import make_db_config from rhodecode.model.db import Repository from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel from rhodecode.model.scm import ScmModel from rhodecode.lib.utils2 import safe_unicode class TestRepoModel: def test_remove_repo(self, backend): repo = backend.create_repo() Session().commit() RepoModel().delete(repo=repo) Session().commit() repos = ScmModel().repo_scan() assert Repository.get_by_repo_name(repo_name=backend.repo_name) is None assert repo.repo_name not in repos def test_remove_repo_raises_exc_when_attached_forks(self, backend): repo = backend.create_repo() Session().commit() backend.create_fork() Session().commit() with pytest.raises(AttachedForksError): RepoModel().delete(repo=repo) def test_remove_repo_delete_forks(self, backend): repo = backend.create_repo() Session().commit() fork = backend.create_fork() Session().commit() fork_of_fork = backend.create_fork() Session().commit() RepoModel().delete(repo=repo, forks='delete') Session().commit() assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None assert Repository.get_by_repo_name(repo_name=fork.repo_name) is None assert ( Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name) is None) def test_remove_repo_detach_forks(self, backend): repo = backend.create_repo() Session().commit() fork = backend.create_fork() Session().commit() fork_of_fork = backend.create_fork() Session().commit() RepoModel().delete(repo=repo, forks='detach') Session().commit() assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None assert ( Repository.get_by_repo_name(repo_name=fork.repo_name) is not None) assert ( Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name) is not None) @pytest.mark.parametrize("filename, expected", [ ("README", True), ("README.rst", False), ]) def test_filenode_is_link(self, vcsbackend, filename, expected): repo = vcsbackend.repo assert repo.get_commit().is_link(filename) is expected def test_get_commit(self, backend): backend.repo.get_commit() def test_get_changeset_is_deprecated(self, backend): repo = backend.repo pytest.deprecated_call(repo.get_changeset) def test_clone_url_encrypted_value(self, backend): repo = backend.create_repo() Session().commit() repo.clone_url = 'https://marcink:qweqwe@code.rhodecode.com' Session().add(repo) Session().commit() assert repo.clone_url == 'https://marcink:qweqwe@code.rhodecode.com' @pytest.mark.backends("git", "svn") def test_create_filesystem_repo_installs_hooks(self, tmpdir, backend): hook_methods = { 'git': 'install_git_hook', 'svn': 'install_svn_hooks' } repo = backend.create_repo() repo_name = repo.repo_name model = RepoModel() repo_location = tempfile.mkdtemp() model.repos_path = repo_location method = hook_methods[backend.alias] with mock.patch.object(ScmModel, method) as hooks_mock: model._create_filesystem_repo( repo_name, backend.alias, repo_group='', clone_uri=None) assert hooks_mock.call_count == 1 hook_args, hook_kwargs = hooks_mock.call_args assert hook_args[0].name == repo_name @pytest.mark.parametrize("use_global_config, repo_name_passed", [ (True, False), (False, True) ]) def test_per_repo_config_is_generated_during_filesystem_repo_creation( self, tmpdir, backend, use_global_config, repo_name_passed): repo_name = 'test-{}-repo-{}'.format(backend.alias, use_global_config) config = make_db_config() model = RepoModel() with mock.patch('rhodecode.model.repo.make_db_config') as config_mock: config_mock.return_value = config model._create_filesystem_repo( repo_name, backend.alias, repo_group='', clone_uri=None, use_global_config=use_global_config) expected_repo_name = repo_name if repo_name_passed else None expected_call = mock.call(clear_session=False, repo=expected_repo_name) assert expected_call in config_mock.call_args_list def test_update_commit_cache_with_config(serf, backend): repo = backend.create_repo() with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm: scm_instance = mock.Mock() scm_instance.get_commit.return_value = { 'raw_id': 40*'0', 'revision': 1 } scm.return_value = scm_instance repo.update_commit_cache() scm.assert_called_with(cache=False, config=None) config = {'test': 'config'} repo.update_commit_cache(config=config) scm.assert_called_with( cache=False, config=config) class TestGetUsers(object): def test_returns_active_users(self, backend, user_util): for i in range(4): is_active = i % 2 == 0 user_util.create_user(active=is_active, lastname='Fake user') with mock.patch('rhodecode.lib.helpers.gravatar_url'): users = RepoModel().get_users() fake_users = [u for u in users if u['last_name'] == 'Fake user'] assert len(fake_users) == 2 expected_keys = ( 'id', 'first_name', 'last_name', 'username', 'icon_link', 'value_display', 'value', 'value_type') for user in users: assert user['value_type'] is 'user' for key in expected_keys: assert key in user def test_returns_user_filtered_by_last_name(self, backend, user_util): keywords = ('aBc', u'ünicode') for keyword in keywords: for i in range(2): user_util.create_user( active=True, lastname=u'Fake {} user'.format(keyword)) with mock.patch('rhodecode.lib.helpers.gravatar_url'): keyword = keywords[1].lower() users = RepoModel().get_users(name_contains=keyword) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 2 for user in fake_users: assert user['last_name'] == safe_unicode('Fake ünicode user') def test_returns_user_filtered_by_first_name(self, backend, user_util): created_users = [] keywords = ('aBc', u'ünicode') for keyword in keywords: for i in range(2): created_users.append(user_util.create_user( active=True, lastname='Fake user', firstname=u'Fake {} user'.format(keyword))) keyword = keywords[1].lower() with mock.patch('rhodecode.lib.helpers.gravatar_url'): users = RepoModel().get_users(name_contains=keyword) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 2 for user in fake_users: assert user['first_name'] == safe_unicode('Fake ünicode user') def test_returns_user_filtered_by_username(self, backend, user_util): created_users = [] for i in range(5): created_users.append(user_util.create_user( active=True, lastname='Fake user')) user_filter = created_users[-1].username[-2:] with mock.patch('rhodecode.lib.helpers.gravatar_url'): users = RepoModel().get_users(name_contains=user_filter) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 1 assert fake_users[0]['username'] == created_users[-1].username def test_returns_limited_user_list(self, backend, user_util): created_users = [] for i in range(5): created_users.append(user_util.create_user( active=True, lastname='Fake user')) with mock.patch('rhodecode.lib.helpers.gravatar_url'): users = RepoModel().get_users(name_contains='Fake', limit=3) fake_users = [u for u in users if u['last_name'].startswith('Fake')] assert len(fake_users) == 3 class TestGetUserGroups(object): def test_returns_filtered_list(self, backend, user_util): created_groups = [] for i in range(4): created_groups.append( user_util.create_user_group(users_group_active=True)) group_filter = created_groups[-1].users_group_name[-2:] with mock.patch('rhodecode.lib.helpers.gravatar_url'): with self._patch_user_group_list(): groups = RepoModel().get_user_groups(group_filter) fake_groups = [ u for u in groups if u['value'].startswith('test_returns')] assert len(fake_groups) == 1 assert fake_groups[0]['value'] == created_groups[-1].users_group_name assert fake_groups[0]['value_display'].startswith( 'Group: test_returns') def test_returns_limited_list(self, backend, user_util): created_groups = [] for i in range(3): created_groups.append( user_util.create_user_group(users_group_active=True)) with mock.patch('rhodecode.lib.helpers.gravatar_url'): with self._patch_user_group_list(): groups = RepoModel().get_user_groups('test_returns') fake_groups = [ u for u in groups if u['value'].startswith('test_returns')] assert len(fake_groups) == 3 def test_returns_active_user_groups(self, backend, user_util): for i in range(4): is_active = i % 2 == 0 user_util.create_user_group(users_group_active=is_active) with mock.patch('rhodecode.lib.helpers.gravatar_url'): with self._patch_user_group_list(): groups = RepoModel().get_user_groups() expected = ('id', 'icon_link', 'value_display', 'value', 'value_type') for group in groups: assert group['value_type'] is 'user_group' for key in expected: assert key in group fake_groups = [ u for u in groups if u['value'].startswith('test_returns')] assert len(fake_groups) == 2 for user in fake_groups: assert user['value_display'].startswith('Group: test_returns') def _patch_user_group_list(self): def side_effect(group_list, perm_set): return group_list return mock.patch( 'rhodecode.model.repo.UserGroupList', side_effect=side_effect)