test_repos.py
308 lines
| 11.5 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2010-2016 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import tempfile | ||||
import mock | ||||
import pytest | ||||
from rhodecode.lib.exceptions import AttachedForksError | ||||
from rhodecode.lib.utils import make_db_config | ||||
from rhodecode.model.db import Repository | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.scm import ScmModel | ||||
from rhodecode.lib.utils2 import safe_unicode | ||||
class TestRepoModel: | ||||
def test_remove_repo(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
RepoModel().delete(repo=repo) | ||||
Session().commit() | ||||
repos = ScmModel().repo_scan() | ||||
assert Repository.get_by_repo_name(repo_name=backend.repo_name) is None | ||||
assert repo.repo_name not in repos | ||||
def test_remove_repo_raises_exc_when_attached_forks(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
backend.create_fork() | ||||
Session().commit() | ||||
with pytest.raises(AttachedForksError): | ||||
RepoModel().delete(repo=repo) | ||||
def test_remove_repo_delete_forks(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
fork = backend.create_fork() | ||||
Session().commit() | ||||
fork_of_fork = backend.create_fork() | ||||
Session().commit() | ||||
RepoModel().delete(repo=repo, forks='delete') | ||||
Session().commit() | ||||
assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None | ||||
assert Repository.get_by_repo_name(repo_name=fork.repo_name) is None | ||||
assert ( | ||||
Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name) | ||||
is None) | ||||
def test_remove_repo_detach_forks(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
fork = backend.create_fork() | ||||
Session().commit() | ||||
fork_of_fork = backend.create_fork() | ||||
Session().commit() | ||||
RepoModel().delete(repo=repo, forks='detach') | ||||
Session().commit() | ||||
assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None | ||||
assert ( | ||||
Repository.get_by_repo_name(repo_name=fork.repo_name) is not None) | ||||
assert ( | ||||
Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name) | ||||
is not None) | ||||
@pytest.mark.parametrize("filename, expected", [ | ||||
("README", True), | ||||
("README.rst", False), | ||||
]) | ||||
def test_filenode_is_link(self, vcsbackend, filename, expected): | ||||
repo = vcsbackend.repo | ||||
assert repo.get_commit().is_link(filename) is expected | ||||
def test_get_commit(self, backend): | ||||
backend.repo.get_commit() | ||||
def test_get_changeset_is_deprecated(self, backend): | ||||
repo = backend.repo | ||||
pytest.deprecated_call(repo.get_changeset) | ||||
def test_clone_url_encrypted_value(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
repo.clone_url = 'https://marcink:qweqwe@code.rhodecode.com' | ||||
Session().add(repo) | ||||
Session().commit() | ||||
assert repo.clone_url == 'https://marcink:qweqwe@code.rhodecode.com' | ||||
@pytest.mark.backends("git", "svn") | ||||
def test_create_filesystem_repo_installs_hooks(self, tmpdir, backend): | ||||
hook_methods = { | ||||
'git': 'install_git_hook', | ||||
'svn': 'install_svn_hooks' | ||||
} | ||||
repo = backend.create_repo() | ||||
repo_name = repo.repo_name | ||||
model = RepoModel() | ||||
repo_location = tempfile.mkdtemp() | ||||
model.repos_path = repo_location | ||||
method = hook_methods[backend.alias] | ||||
with mock.patch.object(ScmModel, method) as hooks_mock: | ||||
model._create_filesystem_repo( | ||||
repo_name, backend.alias, repo_group='', clone_uri=None) | ||||
assert hooks_mock.call_count == 1 | ||||
hook_args, hook_kwargs = hooks_mock.call_args | ||||
assert hook_args[0].name == repo_name | ||||
@pytest.mark.parametrize("use_global_config, repo_name_passed", [ | ||||
(True, False), | ||||
(False, True) | ||||
]) | ||||
def test_per_repo_config_is_generated_during_filesystem_repo_creation( | ||||
self, tmpdir, backend, use_global_config, repo_name_passed): | ||||
repo_name = 'test-{}-repo-{}'.format(backend.alias, use_global_config) | ||||
config = make_db_config() | ||||
model = RepoModel() | ||||
with mock.patch('rhodecode.model.repo.make_db_config') as config_mock: | ||||
config_mock.return_value = config | ||||
model._create_filesystem_repo( | ||||
repo_name, backend.alias, repo_group='', clone_uri=None, | ||||
use_global_config=use_global_config) | ||||
expected_repo_name = repo_name if repo_name_passed else None | ||||
expected_call = mock.call(clear_session=False, repo=expected_repo_name) | ||||
assert expected_call in config_mock.call_args_list | ||||
def test_update_commit_cache_with_config(serf, backend): | ||||
repo = backend.create_repo() | ||||
with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm: | ||||
scm_instance = mock.Mock() | ||||
scm_instance.get_commit.return_value = { | ||||
'raw_id': 40*'0' | ||||
} | ||||
scm.return_value = scm_instance | ||||
repo.update_commit_cache() | ||||
scm.assert_called_with(cache=False, config=None) | ||||
config = {'test': 'config'} | ||||
repo.update_commit_cache(config=config) | ||||
scm.assert_called_with( | ||||
cache=False, config=config) | ||||
class TestGetUsers(object): | ||||
def test_returns_active_users(self, backend, user_util): | ||||
for i in range(4): | ||||
is_active = i % 2 == 0 | ||||
user_util.create_user(active=is_active, lastname='Fake user') | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
users = RepoModel().get_users() | ||||
fake_users = [u for u in users if u['last_name'] == 'Fake user'] | ||||
assert len(fake_users) == 2 | ||||
expected_keys = ( | ||||
'id', 'first_name', 'last_name', 'username', 'icon_link', | ||||
'value_display', 'value', 'value_type') | ||||
for user in users: | ||||
assert user['value_type'] is 'user' | ||||
for key in expected_keys: | ||||
assert key in user | ||||
def test_returns_user_filtered_by_last_name(self, backend, user_util): | ||||
keywords = ('aBc', u'ünicode') | ||||
for keyword in keywords: | ||||
for i in range(2): | ||||
user_util.create_user( | ||||
active=True, lastname=u'Fake {} user'.format(keyword)) | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
keyword = keywords[1].lower() | ||||
users = RepoModel().get_users(name_contains=keyword) | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 2 | ||||
for user in fake_users: | ||||
assert user['last_name'] == safe_unicode('Fake ünicode user') | ||||
def test_returns_user_filtered_by_first_name(self, backend, user_util): | ||||
created_users = [] | ||||
keywords = ('aBc', u'ünicode') | ||||
for keyword in keywords: | ||||
for i in range(2): | ||||
created_users.append(user_util.create_user( | ||||
active=True, lastname='Fake user', | ||||
firstname=u'Fake {} user'.format(keyword))) | ||||
keyword = keywords[1].lower() | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
users = RepoModel().get_users(name_contains=keyword) | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 2 | ||||
for user in fake_users: | ||||
assert user['first_name'] == safe_unicode('Fake ünicode user') | ||||
def test_returns_user_filtered_by_username(self, backend, user_util): | ||||
created_users = [] | ||||
for i in range(5): | ||||
created_users.append(user_util.create_user( | ||||
active=True, lastname='Fake user')) | ||||
user_filter = created_users[-1].username[-2:] | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
users = RepoModel().get_users(name_contains=user_filter) | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 1 | ||||
assert fake_users[0]['username'] == created_users[-1].username | ||||
def test_returns_limited_user_list(self, backend, user_util): | ||||
created_users = [] | ||||
for i in range(5): | ||||
created_users.append(user_util.create_user( | ||||
active=True, lastname='Fake user')) | ||||
with mock.patch('rhodecode.lib.helpers.gravatar_url'): | ||||
users = RepoModel().get_users(name_contains='Fake', limit=3) | ||||
fake_users = [u for u in users if u['last_name'].startswith('Fake')] | ||||
assert len(fake_users) == 3 | ||||
class TestGetUserGroups(object): | ||||
def test_returns_filtered_list(self, backend, user_util): | ||||
created_groups = [] | ||||
for i in range(4): | ||||
created_groups.append( | ||||
user_util.create_user_group(users_group_active=True)) | ||||
group_filter = created_groups[-1].users_group_name[-2:] | ||||
with self._patch_user_group_list(): | ||||
groups = RepoModel().get_user_groups(group_filter) | ||||
fake_groups = [ | ||||
u for u in groups if u['value'].startswith('test_returns')] | ||||
assert len(fake_groups) == 1 | ||||
assert fake_groups[0]['value'] == created_groups[-1].users_group_name | ||||
assert fake_groups[0]['value_display'].startswith( | ||||
'Group: test_returns') | ||||
def test_returns_limited_list(self, backend, user_util): | ||||
created_groups = [] | ||||
for i in range(3): | ||||
created_groups.append( | ||||
user_util.create_user_group(users_group_active=True)) | ||||
with self._patch_user_group_list(): | ||||
groups = RepoModel().get_user_groups('test_returns') | ||||
fake_groups = [ | ||||
u for u in groups if u['value'].startswith('test_returns')] | ||||
assert len(fake_groups) == 3 | ||||
def test_returns_active_user_groups(self, backend, user_util): | ||||
for i in range(4): | ||||
is_active = i % 2 == 0 | ||||
user_util.create_user_group(users_group_active=is_active) | ||||
with self._patch_user_group_list(): | ||||
groups = RepoModel().get_user_groups() | ||||
expected = ('id', 'icon_link', 'value_display', 'value', 'value_type') | ||||
for group in groups: | ||||
assert group['value_type'] is 'user_group' | ||||
for key in expected: | ||||
assert key in group | ||||
fake_groups = [ | ||||
u for u in groups if u['value'].startswith('test_returns')] | ||||
assert len(fake_groups) == 2 | ||||
for user in fake_groups: | ||||
assert user['value_display'].startswith('Group: test_returns') | ||||
def _patch_user_group_list(self): | ||||
def side_effect(group_list, perm_set): | ||||
return group_list | ||||
return mock.patch( | ||||
'rhodecode.model.repo.UserGroupList', side_effect=side_effect) | ||||