test_repos.py
170 lines
| 6.2 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r2677 | import os | |||
import mock | ||||
import pytest | ||||
r1 | import tempfile | |||
from rhodecode.lib.exceptions import AttachedForksError | ||||
from rhodecode.lib.utils import make_db_config | ||||
from rhodecode.model.db import Repository | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.scm import ScmModel | ||||
r1676 | class TestRepoModel(object): | |||
r1 | ||||
def test_remove_repo(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
RepoModel().delete(repo=repo) | ||||
Session().commit() | ||||
repos = ScmModel().repo_scan() | ||||
assert Repository.get_by_repo_name(repo_name=backend.repo_name) is None | ||||
assert repo.repo_name not in repos | ||||
def test_remove_repo_raises_exc_when_attached_forks(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
backend.create_fork() | ||||
Session().commit() | ||||
with pytest.raises(AttachedForksError): | ||||
RepoModel().delete(repo=repo) | ||||
def test_remove_repo_delete_forks(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
fork = backend.create_fork() | ||||
Session().commit() | ||||
fork_of_fork = backend.create_fork() | ||||
Session().commit() | ||||
RepoModel().delete(repo=repo, forks='delete') | ||||
Session().commit() | ||||
assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None | ||||
assert Repository.get_by_repo_name(repo_name=fork.repo_name) is None | ||||
assert ( | ||||
Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name) | ||||
is None) | ||||
def test_remove_repo_detach_forks(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
fork = backend.create_fork() | ||||
Session().commit() | ||||
fork_of_fork = backend.create_fork() | ||||
Session().commit() | ||||
RepoModel().delete(repo=repo, forks='detach') | ||||
Session().commit() | ||||
assert Repository.get_by_repo_name(repo_name=repo.repo_name) is None | ||||
assert ( | ||||
Repository.get_by_repo_name(repo_name=fork.repo_name) is not None) | ||||
assert ( | ||||
Repository.get_by_repo_name(repo_name=fork_of_fork.repo_name) | ||||
is not None) | ||||
@pytest.mark.parametrize("filename, expected", [ | ||||
("README", True), | ||||
("README.rst", False), | ||||
]) | ||||
def test_filenode_is_link(self, vcsbackend, filename, expected): | ||||
repo = vcsbackend.repo | ||||
assert repo.get_commit().is_link(filename) is expected | ||||
def test_get_commit(self, backend): | ||||
backend.repo.get_commit() | ||||
def test_get_changeset_is_deprecated(self, backend): | ||||
repo = backend.repo | ||||
pytest.deprecated_call(repo.get_changeset) | ||||
def test_clone_url_encrypted_value(self, backend): | ||||
repo = backend.create_repo() | ||||
Session().commit() | ||||
repo.clone_url = 'https://marcink:qweqwe@code.rhodecode.com' | ||||
Session().add(repo) | ||||
Session().commit() | ||||
assert repo.clone_url == 'https://marcink:qweqwe@code.rhodecode.com' | ||||
@pytest.mark.backends("git", "svn") | ||||
def test_create_filesystem_repo_installs_hooks(self, tmpdir, backend): | ||||
repo = backend.create_repo() | ||||
repo_name = repo.repo_name | ||||
r5356 | with mock.patch('rhodecode.model.repo.RepoModel.repos_path', | |||
new_callable=mock.PropertyMock) as mocked_models_property: | ||||
mocked_models_property.return_value = tempfile.mkdtemp() | ||||
repo = RepoModel()._create_filesystem_repo( | ||||
repo_name, backend.alias, repo_group='', clone_uri=None) | ||||
r2677 | ||||
r5356 | hooks = { | |||
'svn': ('pre-commit', 'post-commit'), | ||||
'git': ('pre-receive', 'post-receive'), | ||||
} | ||||
for hook in hooks[backend.alias]: | ||||
with open(os.path.join(repo.path, 'hooks', hook)) as f: | ||||
data = f.read() | ||||
assert 'RC_HOOK_VER' in data | ||||
r1 | ||||
@pytest.mark.parametrize("use_global_config, repo_name_passed", [ | ||||
(True, False), | ||||
(False, True) | ||||
]) | ||||
def test_per_repo_config_is_generated_during_filesystem_repo_creation( | ||||
self, tmpdir, backend, use_global_config, repo_name_passed): | ||||
repo_name = 'test-{}-repo-{}'.format(backend.alias, use_global_config) | ||||
config = make_db_config() | ||||
model = RepoModel() | ||||
with mock.patch('rhodecode.model.repo.make_db_config') as config_mock: | ||||
config_mock.return_value = config | ||||
model._create_filesystem_repo( | ||||
repo_name, backend.alias, repo_group='', clone_uri=None, | ||||
use_global_config=use_global_config) | ||||
expected_repo_name = repo_name if repo_name_passed else None | ||||
expected_call = mock.call(clear_session=False, repo=expected_repo_name) | ||||
assert expected_call in config_mock.call_args_list | ||||
def test_update_commit_cache_with_config(serf, backend): | ||||
repo = backend.create_repo() | ||||
with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm: | ||||
scm_instance = mock.Mock() | ||||
scm_instance.get_commit.return_value = { | ||||
r351 | 'raw_id': 40*'0', | |||
'revision': 1 | ||||
r1 | } | |||
scm.return_value = scm_instance | ||||
repo.update_commit_cache() | ||||
scm.assert_called_with(cache=False, config=None) | ||||
config = {'test': 'config'} | ||||
repo.update_commit_cache(config=config) | ||||
scm.assert_called_with( | ||||
cache=False, config=config) | ||||