# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import multiprocessing import os import shutil import mock import py import pytest import rhodecode from rhodecode.lib import caching_query from rhodecode.lib import utils from rhodecode.lib.str_utils import safe_bytes from rhodecode.model import settings from rhodecode.model import db from rhodecode.model import meta from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.scm import ScmModel from rhodecode.model.settings import UiSetting, SettingsModel from rhodecode.tests.fixtures.fixture_pyramid import rhodecode_factory from rhodecode.tests.fixtures.rc_fixture import Fixture from rhodecode_tools.lib.hash_utils import md5_safe from rhodecode.lib.ext_json import json fixture = Fixture() def extract_hooks(config): """Return a dictionary with the hook entries of the given config.""" hooks = {} config_items = config.serialize() for section, name, value in config_items: if section != 'hooks': continue hooks[name] = value return hooks def disable_hooks(request, hooks): """Disables the given hooks from the UI settings.""" session = meta.Session() model = SettingsModel() for hook_key in hooks: sett = model.get_ui_by_key(hook_key) sett.ui_active = False session.add(sett) # Invalidate cache ui_settings = session.query(db.RhodeCodeUi).options( caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings')) meta.cache.invalidate( ui_settings, {}, caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings')) ui_settings = session.query(db.RhodeCodeUi).options( caching_query.FromCache('sql_cache_short', 'get_hook_settings')) meta.cache.invalidate( ui_settings, {}, caching_query.FromCache('sql_cache_short', 'get_hook_settings')) @request.addfinalizer def rollback(): session.rollback() HOOK_PRE_PUSH = db.RhodeCodeUi.HOOK_PRE_PUSH HOOK_PRETX_PUSH = db.RhodeCodeUi.HOOK_PRETX_PUSH HOOK_PUSH = db.RhodeCodeUi.HOOK_PUSH HOOK_PRE_PULL = db.RhodeCodeUi.HOOK_PRE_PULL HOOK_PULL = db.RhodeCodeUi.HOOK_PULL HOOK_REPO_SIZE = db.RhodeCodeUi.HOOK_REPO_SIZE HOOK_PUSH_KEY = db.RhodeCodeUi.HOOK_PUSH_KEY HG_HOOKS = frozenset( (HOOK_PRE_PULL, HOOK_PULL, HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_PUSH, HOOK_REPO_SIZE, HOOK_PUSH_KEY)) @pytest.mark.parametrize('disabled_hooks,expected_hooks', [ ([], HG_HOOKS), (HG_HOOKS, []), ([HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_REPO_SIZE, HOOK_PUSH_KEY], [HOOK_PRE_PULL, HOOK_PULL, HOOK_PUSH]), # When a pull/push hook is disabled, its pre-pull/push counterpart should # be disabled too. ([HOOK_PUSH], [HOOK_PRE_PULL, HOOK_PULL, HOOK_REPO_SIZE]), ([HOOK_PULL], [HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_PUSH, HOOK_REPO_SIZE, HOOK_PUSH_KEY]), ]) def test_make_db_config_hg_hooks(baseapp, request, disabled_hooks, expected_hooks): disable_hooks(request, disabled_hooks) config = utils.make_db_config() hooks = extract_hooks(config) assert set(hooks.keys()).intersection(HG_HOOKS) == set(expected_hooks) @pytest.mark.parametrize('disabled_hooks,expected_hooks', [ ([], ['pull', 'push']), ([HOOK_PUSH], ['pull']), ([HOOK_PULL], ['push']), ([HOOK_PULL, HOOK_PUSH], []), ]) def test_get_enabled_hook_classes(disabled_hooks, expected_hooks): hook_keys = (HOOK_PUSH, HOOK_PULL) ui_settings = [ ('hooks', key, 'some value', key not in disabled_hooks) for key in hook_keys] result = utils.get_enabled_hook_classes(ui_settings) assert sorted(result) == expected_hooks def test_get_filesystem_repos_finds_repos(tmpdir, baseapp): _stub_git_repo(tmpdir.ensure('repo', dir=True)) repos = list(utils.get_filesystem_repos(str(tmpdir))) assert repos == [('repo', ('git', tmpdir.join('repo')))] def test_get_filesystem_repos_skips_directories(tmpdir, baseapp): tmpdir.ensure('not-a-repo', dir=True) repos = list(utils.get_filesystem_repos(str(tmpdir))) assert repos == [] def test_get_filesystem_repos_skips_directories_with_repos(tmpdir, baseapp): _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True)) repos = list(utils.get_filesystem_repos(str(tmpdir))) assert repos == [] def test_get_filesystem_repos_finds_repos_in_subdirectories(tmpdir, baseapp): _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True)) repos = list(utils.get_filesystem_repos(str(tmpdir), recursive=True)) assert repos == [('subdir/repo', ('git', tmpdir.join('subdir', 'repo')))] def test_get_filesystem_repos_skips_names_starting_with_dot(tmpdir): _stub_git_repo(tmpdir.ensure('.repo', dir=True)) repos = list(utils.get_filesystem_repos(str(tmpdir))) assert repos == [] def test_get_filesystem_repos_skips_files(tmpdir): tmpdir.ensure('test-file') repos = list(utils.get_filesystem_repos(str(tmpdir))) assert repos == [] def test_get_filesystem_repos_skips_removed_repositories(tmpdir): removed_repo_name = 'rm__00000000_000000_000000__.stub' assert utils.REMOVED_REPO_PAT.match(removed_repo_name) _stub_git_repo(tmpdir.ensure(removed_repo_name, dir=True)) repos = list(utils.get_filesystem_repos(str(tmpdir))) assert repos == [] def _stub_git_repo(repo_path): """ Make `repo_path` look like a Git repository. """ repo_path.ensure('.git', dir=True) def test_get_dirpaths_returns_all_paths_on_str(tmpdir): tmpdir.ensure('test-file') tmpdir.ensure('test-file-1') tmp_path = str(tmpdir) dirpaths = utils.get_dirpaths(tmp_path) assert list(sorted(dirpaths)) == ['test-file', 'test-file-1'] def test_get_dirpaths_returns_all_paths_on_bytes(tmpdir): tmpdir.ensure('test-file-bytes') tmp_path = str(tmpdir) dirpaths = utils.get_dirpaths(safe_bytes(tmp_path)) assert list(sorted(dirpaths)) == [b'test-file-bytes'] def test_get_dirpaths_returns_all_paths_bytes( tmpdir, platform_encodes_filenames): if platform_encodes_filenames: pytest.skip("This platform seems to encode filenames.") tmpdir.ensure('repo-a-umlaut-\xe4') dirpaths = utils.get_dirpaths(str(tmpdir)) assert dirpaths == ['repo-a-umlaut-\xe4'] def test_get_dirpaths_skips_paths_it_cannot_decode( tmpdir, platform_encodes_filenames): if platform_encodes_filenames: pytest.skip("This platform seems to encode filenames.") path_with_latin1 = 'repo-a-umlaut-\xe4' tmp_path = str(tmpdir.ensure(path_with_latin1)) dirpaths = utils.get_dirpaths(tmp_path) assert dirpaths == [] @pytest.fixture(scope='session') def platform_encodes_filenames(): """ Boolean indicator if the current platform changes filename encodings. """ path_with_latin1 = 'repo-a-umlaut-\xe4' tmpdir = py.path.local.mkdtemp() tmpdir.ensure(path_with_latin1) read_path = tmpdir.listdir()[0].basename tmpdir.remove() return path_with_latin1 != read_path def test_repo2db_cleaner_removes_zombie_groups(repo_groups): session = meta.Session() zombie_group, parent_group, child_group = repo_groups zombie_path = os.path.join( RepoGroupModel().repos_path, zombie_group.full_path) os.rmdir(zombie_path) # Avoid removing test repos when calling repo2db_mapper repo_list = [repo.repo_name for repo in session.query(db.Repository).all()] utils.repo2db_cleanup(skip_repos=repo_list) groups_in_db = session.query(db.RepoGroup).all() assert child_group in groups_in_db assert parent_group in groups_in_db assert zombie_path not in groups_in_db @pytest.mark.backends("hg", "git", "svn") def test_repo2db_cleaner_removes_zombie_repos(backend): repo = backend.create_repo() zombie_path = repo.repo_full_path shutil.rmtree(zombie_path) removed, errors = utils.repo2db_cleanup() assert len(removed) == 1 assert not errors def test_repo2db_mapper_adds_new_repos(request, backend): repo = backend.create_repo() cleanup_repos = [] cleanup_groups = [] for num in range(5): copy_repo_name = f'{repo.repo_name}-{num}' copy_repo_path = f'{repo.repo_full_path}-{num}' shutil.copytree(repo.repo_full_path, copy_repo_path) cleanup_repos.append(copy_repo_name) for gr_num in range(5): gr_name = f'my_gr_{gr_num}' dest_gr = os.path.join(os.path.dirname(repo.repo_full_path), gr_name) os.makedirs(dest_gr, exist_ok=True) copy_repo_name = f'{gr_name}/{repo.repo_name}-{gr_num}' copy_repo_path = f'{dest_gr}/{repo.repo_name}-{gr_num}' shutil.copytree(repo.repo_full_path, copy_repo_path) cleanup_repos.append(copy_repo_name) cleanup_groups.append(gr_name) repo_list = ScmModel().repo_scan() added, errors = utils.repo2db_mapper(repo_list) Session().commit() assert not errors assert len(added) == 10 @request.addfinalizer def cleanup(): for _repo in cleanup_repos: del_result = RepoModel().delete(_repo, call_events=False) Session().commit() assert del_result is True for _repo_group in cleanup_groups: del_result = RepoGroupModel().delete(_repo_group, force_delete=True, call_events=False) Session().commit() assert del_result is True def test_repo2db_mapper_installs_hooks_for_repos_in_db(backend): repo = backend.create_repo() repo_list = {repo.repo_name: 'test'} added, errors = utils.repo2db_mapper(repo_list) assert not errors assert repo.scm_instance().get_hooks_info() == {'pre_version': rhodecode.__version__, 'post_version': rhodecode.__version__} @pytest.mark.backends("git", "svn") def test_repo2db_mapper_installs_hooks_for_newly_added_repos(backend): repo = backend.create_repo() RepoModel().delete(repo, fs_remove=False) meta.Session().commit() repo_list = {repo.repo_name: repo.scm_instance()} added, errors = utils.repo2db_mapper(repo_list) assert not errors assert len(added) == 1 class TestPasswordChanged(object): def setup_method(self): self.session = { 'rhodecode_user': { 'password': '0cc175b9c0f1b6a831c399e269772661' } } self.auth_user = mock.Mock() self.auth_user.userame = 'test' self.auth_user.password = 'abc123' def test_returns_false_for_default_user(self): self.auth_user.username = db.User.DEFAULT_USER result = utils.password_changed(self.auth_user, self.session) assert result is False def test_returns_false_if_password_was_not_changed(self): self.session['rhodecode_user']['password'] = md5_safe( self.auth_user.password) result = utils.password_changed(self.auth_user, self.session) assert result is False def test_returns_true_if_password_was_changed(self): result = utils.password_changed(self.auth_user, self.session) assert result is True def test_returns_true_if_auth_user_password_is_empty(self): self.auth_user.password = None result = utils.password_changed(self.auth_user, self.session) assert result is True def test_returns_true_if_session_password_is_empty(self): self.session['rhodecode_user'].pop('password') result = utils.password_changed(self.auth_user, self.session) assert result is True class TestReadOpenSourceLicenses(object): def test_success(self): utils._license_cache = None json_data = ''' { "python2.7-pytest-2.7.1": {"UNKNOWN": null}, "python2.7-Markdown-2.6.2": { "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause" } } ''' resource_string_patch = mock.patch.object( utils.pkg_resources, 'resource_string', return_value=json_data) with resource_string_patch: result = utils.read_opensource_licenses() assert result == json.loads(json_data) def test_caching(self): utils._license_cache = { "python2.7-pytest-2.7.1": { "UNKNOWN": None }, "python2.7-Markdown-2.6.2": { "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause" } } resource_patch = mock.patch.object( utils.pkg_resources, 'resource_string', side_effect=Exception) json_patch = mock.patch.object( utils.json, 'loads', side_effect=Exception) with resource_patch as resource_mock, json_patch as json_mock: result = utils.read_opensource_licenses() assert resource_mock.call_count == 0 assert json_mock.call_count == 0 assert result == utils._license_cache def test_licenses_file_contains_no_unknown_licenses(self): utils._license_cache = None result = utils.read_opensource_licenses() for license_data in result: if isinstance(license_data["license"], list): for lic_data in license_data["license"]: assert 'UNKNOWN' not in lic_data["fullName"] else: full_name = license_data.get("fullName") or license_data assert 'UNKNOWN' not in full_name class TestMakeDbConfig(object): def test_data_from_config_data_from_db_returned(self): test_data = [ ('section1', 'option1', 'value1'), ('section2', 'option2', 'value2'), ('section3', 'option3', 'value3'), ] with mock.patch.object(utils, 'prepare_config_data') as config_mock: config_mock.return_value = test_data kwargs = {'clear_session': False, 'repo': 'test_repo'} result = utils.make_db_config(**kwargs) config_mock.assert_called_once_with(**kwargs) for section, option, expected_value in test_data: value = result.get(section, option) assert value == expected_value class TestPrepareConfigData(object): def test_prepare_config_data_returns_active_settings(self): test_data = [ UiSetting('section1', 'option1', 'value1', True), UiSetting('section2', 'option2', 'value2', True), UiSetting('section3', 'option3', 'value3', False), ] repo_name = 'test_repo' model_patch = mock.patch.object(settings, 'VcsSettingsModel') hooks_patch = mock.patch.object( utils, 'get_enabled_hook_classes', return_value=['pull', 'push', 'repo_size']) with model_patch as model_mock, hooks_patch: instance_mock = mock.Mock() model_mock.return_value = instance_mock instance_mock.get_ui_settings.return_value = test_data result = utils.prepare_config_data( clear_session=False, repo=repo_name) self._assert_repo_name_passed(model_mock, repo_name) assert ('section1', 'option1', 'value1') in result assert ('section2', 'option2', 'value2') in result assert ('section3', 'option3', 'value3') not in result def _assert_repo_name_passed(self, model_mock, repo_name): assert model_mock.call_count == 1 call_args, call_kwargs = model_mock.call_args assert call_kwargs['repo'] == repo_name class TestIsDirWritable(object): def test_returns_false_when_not_writable(self): with mock.patch('builtins.open', side_effect=OSError): assert not utils._is_dir_writable('/stub-path') def test_returns_true_when_writable(self, tmpdir): assert utils._is_dir_writable(str(tmpdir)) def test_is_safe_against_race_conditions(self, tmpdir): workers = multiprocessing.Pool() directories = [str(tmpdir)] * 10 workers.map(utils._is_dir_writable, directories) class TestGetEnabledHooks(object): def test_only_active_hooks_are_enabled(self): ui_settings = [ UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True), UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True), UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', False) ] result = utils.get_enabled_hook_classes(ui_settings) assert result == ['push', 'repo_size'] def test_all_hooks_are_enabled(self): ui_settings = [ UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True), UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True), UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', True) ] result = utils.get_enabled_hook_classes(ui_settings) assert result == ['push', 'repo_size', 'pull'] def test_no_enabled_hooks_when_no_hook_settings_are_found(self): ui_settings = [] result = utils.get_enabled_hook_classes(ui_settings) assert result == [] def test_obfuscate_url_pw(): from rhodecode.lib.utils2 import obfuscate_url_pw engine = '/home/repos/malmö' assert obfuscate_url_pw(engine) @pytest.mark.parametrize("test_ua, expected", [ ("", ""), ('"quoted"', 'quoted'), ('internal-merge', 'internal-merge'), ('hg/internal-merge', 'hg/internal-merge'), ('git/internal-merge', 'git/internal-merge'), # git ('git/2.10.1 (Apple Git-78)', 'git/2.10.1'), ('GiT/2.37.2.windows.2', 'git/2.37.2'), ('git/2.35.1 (Microsoft Windows NT 10.0.19044.0; Win32NT x64) CLR/4.0.30319 VS16/16.0.0', 'git/2.35.1'), ('ssh-user-agent', 'ssh-user-agent'), ('git/ssh-user-agent', 'git/ssh-user-agent'), # hg ('mercurial/proto-1.0 (Mercurial 4.2)', 'mercurial/4.2'), ('mercurial/proto-1.0', ''), ('mercurial/proto-1.0 (Mercurial 3.9.2)', 'mercurial/3.9.2'), ('mercurial/ssh-user-agent', 'mercurial/ssh-user-agent'), ('mercurial/proto-1.0 (Mercurial 5.8rc0)', 'mercurial/5.8rc0'), ]) def test_user_agent_normalizer(test_ua, expected): from rhodecode.lib.utils2 import user_agent_normalizer assert user_agent_normalizer(test_ua, safe=False) == expected