##// END OF EJS Templates
pyro: Return new proxy instances if called from outside of a request context....
pyro: Return new proxy instances if called from outside of a request context. This allows to run code that uses the pyro connection to the VCSServer but does not execute from within a request context, e.g. invoke tasks.

File last commit:

r351:b15f3d23 default
r354:4f555b55 default
Show More
test_repos.py
309 lines | 11.5 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
# Copyright (C) 2010-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import tempfile
import mock
import pytest
from rhodecode.lib.exceptions import AttachedForksError
from rhodecode.lib.utils import make_db_config
from rhodecode.model.db import Repository
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.scm import ScmModel
from rhodecode.lib.utils2 import safe_unicode
class TestRepoModel:
def test_remove_repo(self, backend):
repo = backend.create_repo()
Session().commit()
RepoModel().delete(repo=repo)
Session().commit()
repos = ScmModel().repo_scan()
assert Repository.get_by_repo_name(repo_name=backend.repo_name) is None
assert repo.repo_name not in repos
def test_remove_repo_raises_exc_when_attached_forks(self, backend):
repo = backend.create_repo()
Session().commit()
backend.create_fork()
Session().commit()
with pytest.raises(AttachedForksError):
RepoModel().delete(repo=repo)
def test_remove_repo_delete_forks(self, backend):
repo = backend.create_repo()
Session().commit()
fork = backend.create_fork()
Session().commit()
fork_of_fork = backend.create_fork()
Session().commit()
RepoModel().delete(repo=repo, forks='delete')
Session().commit()
assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None
assert Repository.get_by_repo_name(repo_name=fork.repo_name) is None
assert (
Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name)
is None)
def test_remove_repo_detach_forks(self, backend):
repo = backend.create_repo()
Session().commit()
fork = backend.create_fork()
Session().commit()
fork_of_fork = backend.create_fork()
Session().commit()
RepoModel().delete(repo=repo, forks='detach')
Session().commit()
assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None
assert (
Repository.get_by_repo_name(repo_name=fork.repo_name) is not None)
assert (
Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name)
is not None)
@pytest.mark.parametrize("filename, expected", [
("README", True),
("README.rst", False),
])
def test_filenode_is_link(self, vcsbackend, filename, expected):
repo = vcsbackend.repo
assert repo.get_commit().is_link(filename) is expected
def test_get_commit(self, backend):
backend.repo.get_commit()
def test_get_changeset_is_deprecated(self, backend):
repo = backend.repo
pytest.deprecated_call(repo.get_changeset)
def test_clone_url_encrypted_value(self, backend):
repo = backend.create_repo()
Session().commit()
repo.clone_url = 'https://marcink:qweqwe@code.rhodecode.com'
Session().add(repo)
Session().commit()
assert repo.clone_url == 'https://marcink:qweqwe@code.rhodecode.com'
@pytest.mark.backends("git", "svn")
def test_create_filesystem_repo_installs_hooks(self, tmpdir, backend):
hook_methods = {
'git': 'install_git_hook',
'svn': 'install_svn_hooks'
}
repo = backend.create_repo()
repo_name = repo.repo_name
model = RepoModel()
repo_location = tempfile.mkdtemp()
model.repos_path = repo_location
method = hook_methods[backend.alias]
with mock.patch.object(ScmModel, method) as hooks_mock:
model._create_filesystem_repo(
repo_name, backend.alias, repo_group='', clone_uri=None)
assert hooks_mock.call_count == 1
hook_args, hook_kwargs = hooks_mock.call_args
assert hook_args[0].name == repo_name
@pytest.mark.parametrize("use_global_config, repo_name_passed", [
(True, False),
(False, True)
])
def test_per_repo_config_is_generated_during_filesystem_repo_creation(
self, tmpdir, backend, use_global_config, repo_name_passed):
repo_name = 'test-{}-repo-{}'.format(backend.alias, use_global_config)
config = make_db_config()
model = RepoModel()
with mock.patch('rhodecode.model.repo.make_db_config') as config_mock:
config_mock.return_value = config
model._create_filesystem_repo(
repo_name, backend.alias, repo_group='', clone_uri=None,
use_global_config=use_global_config)
expected_repo_name = repo_name if repo_name_passed else None
expected_call = mock.call(clear_session=False, repo=expected_repo_name)
assert expected_call in config_mock.call_args_list
def test_update_commit_cache_with_config(serf, backend):
repo = backend.create_repo()
with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm:
scm_instance = mock.Mock()
scm_instance.get_commit.return_value = {
tests: fixed test after update of cs cache changes.
r351 'raw_id': 40*'0',
'revision': 1
project: added all source files and assets
r1 }
scm.return_value = scm_instance
repo.update_commit_cache()
scm.assert_called_with(cache=False, config=None)
config = {'test': 'config'}
repo.update_commit_cache(config=config)
scm.assert_called_with(
cache=False, config=config)
class TestGetUsers(object):
def test_returns_active_users(self, backend, user_util):
for i in range(4):
is_active = i % 2 == 0
user_util.create_user(active=is_active, lastname='Fake user')
with mock.patch('rhodecode.lib.helpers.gravatar_url'):
users = RepoModel().get_users()
fake_users = [u for u in users if u['last_name'] == 'Fake user']
assert len(fake_users) == 2
expected_keys = (
'id', 'first_name', 'last_name', 'username', 'icon_link',
'value_display', 'value', 'value_type')
for user in users:
assert user['value_type'] is 'user'
for key in expected_keys:
assert key in user
def test_returns_user_filtered_by_last_name(self, backend, user_util):
keywords = ('aBc', u'ünicode')
for keyword in keywords:
for i in range(2):
user_util.create_user(
active=True, lastname=u'Fake {} user'.format(keyword))
with mock.patch('rhodecode.lib.helpers.gravatar_url'):
keyword = keywords[1].lower()
users = RepoModel().get_users(name_contains=keyword)
fake_users = [u for u in users if u['last_name'].startswith('Fake')]
assert len(fake_users) == 2
for user in fake_users:
assert user['last_name'] == safe_unicode('Fake ünicode user')
def test_returns_user_filtered_by_first_name(self, backend, user_util):
created_users = []
keywords = ('aBc', u'ünicode')
for keyword in keywords:
for i in range(2):
created_users.append(user_util.create_user(
active=True, lastname='Fake user',
firstname=u'Fake {} user'.format(keyword)))
keyword = keywords[1].lower()
with mock.patch('rhodecode.lib.helpers.gravatar_url'):
users = RepoModel().get_users(name_contains=keyword)
fake_users = [u for u in users if u['last_name'].startswith('Fake')]
assert len(fake_users) == 2
for user in fake_users:
assert user['first_name'] == safe_unicode('Fake ünicode user')
def test_returns_user_filtered_by_username(self, backend, user_util):
created_users = []
for i in range(5):
created_users.append(user_util.create_user(
active=True, lastname='Fake user'))
user_filter = created_users[-1].username[-2:]
with mock.patch('rhodecode.lib.helpers.gravatar_url'):
users = RepoModel().get_users(name_contains=user_filter)
fake_users = [u for u in users if u['last_name'].startswith('Fake')]
assert len(fake_users) == 1
assert fake_users[0]['username'] == created_users[-1].username
def test_returns_limited_user_list(self, backend, user_util):
created_users = []
for i in range(5):
created_users.append(user_util.create_user(
active=True, lastname='Fake user'))
with mock.patch('rhodecode.lib.helpers.gravatar_url'):
users = RepoModel().get_users(name_contains='Fake', limit=3)
fake_users = [u for u in users if u['last_name'].startswith('Fake')]
assert len(fake_users) == 3
class TestGetUserGroups(object):
def test_returns_filtered_list(self, backend, user_util):
created_groups = []
for i in range(4):
created_groups.append(
user_util.create_user_group(users_group_active=True))
group_filter = created_groups[-1].users_group_name[-2:]
with self._patch_user_group_list():
groups = RepoModel().get_user_groups(group_filter)
fake_groups = [
u for u in groups if u['value'].startswith('test_returns')]
assert len(fake_groups) == 1
assert fake_groups[0]['value'] == created_groups[-1].users_group_name
assert fake_groups[0]['value_display'].startswith(
'Group: test_returns')
def test_returns_limited_list(self, backend, user_util):
created_groups = []
for i in range(3):
created_groups.append(
user_util.create_user_group(users_group_active=True))
with self._patch_user_group_list():
groups = RepoModel().get_user_groups('test_returns')
fake_groups = [
u for u in groups if u['value'].startswith('test_returns')]
assert len(fake_groups) == 3
def test_returns_active_user_groups(self, backend, user_util):
for i in range(4):
is_active = i % 2 == 0
user_util.create_user_group(users_group_active=is_active)
with self._patch_user_group_list():
groups = RepoModel().get_user_groups()
expected = ('id', 'icon_link', 'value_display', 'value', 'value_type')
for group in groups:
assert group['value_type'] is 'user_group'
for key in expected:
assert key in group
fake_groups = [
u for u in groups if u['value'].startswith('test_returns')]
assert len(fake_groups) == 2
for user in fake_groups:
assert user['value_display'].startswith('Group: test_returns')
def _patch_user_group_list(self):
def side_effect(group_list, perm_set):
return group_list
return mock.patch(
'rhodecode.model.repo.UserGroupList', side_effect=side_effect)