test_utils.py
489 lines
| 17.0 KiB
| text/x-python
|
PythonLexer
r1 | ||||
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import multiprocessing | ||||
import os | ||||
import mock | ||||
import py | ||||
import pytest | ||||
from rhodecode.lib import caching_query | ||||
from rhodecode.lib import utils | ||||
r5087 | from rhodecode.lib.str_utils import safe_bytes | |||
r274 | from rhodecode.model import settings | |||
r1 | from rhodecode.model import db | |||
from rhodecode.model import meta | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.repo_group import RepoGroupModel | ||||
from rhodecode.model.settings import UiSetting, SettingsModel | ||||
from rhodecode.tests.fixture import Fixture | ||||
r5087 | from rhodecode_tools.lib.hash_utils import md5_safe | |||
from rhodecode.lib.ext_json import json | ||||
r1 | ||||
fixture = Fixture() | ||||
def extract_hooks(config): | ||||
"""Return a dictionary with the hook entries of the given config.""" | ||||
hooks = {} | ||||
config_items = config.serialize() | ||||
for section, name, value in config_items: | ||||
if section != 'hooks': | ||||
continue | ||||
hooks[name] = value | ||||
return hooks | ||||
def disable_hooks(request, hooks): | ||||
"""Disables the given hooks from the UI settings.""" | ||||
session = meta.Session() | ||||
model = SettingsModel() | ||||
for hook_key in hooks: | ||||
sett = model.get_ui_by_key(hook_key) | ||||
sett.ui_active = False | ||||
session.add(sett) | ||||
# Invalidate cache | ||||
ui_settings = session.query(db.RhodeCodeUi).options( | ||||
caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings')) | ||||
r5087 | ||||
meta.cache.invalidate( | ||||
ui_settings, {}, | ||||
caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings')) | ||||
r1 | ||||
ui_settings = session.query(db.RhodeCodeUi).options( | ||||
r2883 | caching_query.FromCache('sql_cache_short', 'get_hook_settings')) | |||
r5087 | ||||
meta.cache.invalidate( | ||||
ui_settings, {}, | ||||
caching_query.FromCache('sql_cache_short', 'get_hook_settings')) | ||||
r1 | ||||
@request.addfinalizer | ||||
def rollback(): | ||||
session.rollback() | ||||
HOOK_PRE_PUSH = db.RhodeCodeUi.HOOK_PRE_PUSH | ||||
r1461 | HOOK_PRETX_PUSH = db.RhodeCodeUi.HOOK_PRETX_PUSH | |||
r1 | HOOK_PUSH = db.RhodeCodeUi.HOOK_PUSH | |||
HOOK_PRE_PULL = db.RhodeCodeUi.HOOK_PRE_PULL | ||||
HOOK_PULL = db.RhodeCodeUi.HOOK_PULL | ||||
HOOK_REPO_SIZE = db.RhodeCodeUi.HOOK_REPO_SIZE | ||||
r1755 | HOOK_PUSH_KEY = db.RhodeCodeUi.HOOK_PUSH_KEY | |||
r1 | ||||
HG_HOOKS = frozenset( | ||||
r1461 | (HOOK_PRE_PULL, HOOK_PULL, HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_PUSH, | |||
r1755 | HOOK_REPO_SIZE, HOOK_PUSH_KEY)) | |||
r1 | ||||
@pytest.mark.parametrize('disabled_hooks,expected_hooks', [ | ||||
([], HG_HOOKS), | ||||
(HG_HOOKS, []), | ||||
r1461 | ||||
r1755 | ([HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_REPO_SIZE, HOOK_PUSH_KEY], [HOOK_PRE_PULL, HOOK_PULL, HOOK_PUSH]), | |||
r1461 | ||||
r1 | # When a pull/push hook is disabled, its pre-pull/push counterpart should | |||
# be disabled too. | ||||
([HOOK_PUSH], [HOOK_PRE_PULL, HOOK_PULL, HOOK_REPO_SIZE]), | ||||
r1755 | ([HOOK_PULL], [HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_PUSH, HOOK_REPO_SIZE, | |||
HOOK_PUSH_KEY]), | ||||
r1 | ]) | |||
r2351 | def test_make_db_config_hg_hooks(baseapp, request, disabled_hooks, | |||
r1 | expected_hooks): | |||
disable_hooks(request, disabled_hooks) | ||||
config = utils.make_db_config() | ||||
hooks = extract_hooks(config) | ||||
r5087 | assert set(hooks.keys()).intersection(HG_HOOKS) == set(expected_hooks) | |||
r1 | ||||
@pytest.mark.parametrize('disabled_hooks,expected_hooks', [ | ||||
([], ['pull', 'push']), | ||||
([HOOK_PUSH], ['pull']), | ||||
([HOOK_PULL], ['push']), | ||||
([HOOK_PULL, HOOK_PUSH], []), | ||||
]) | ||||
def test_get_enabled_hook_classes(disabled_hooks, expected_hooks): | ||||
hook_keys = (HOOK_PUSH, HOOK_PULL) | ||||
ui_settings = [ | ||||
('hooks', key, 'some value', key not in disabled_hooks) | ||||
for key in hook_keys] | ||||
result = utils.get_enabled_hook_classes(ui_settings) | ||||
assert sorted(result) == expected_hooks | ||||
r2351 | def test_get_filesystem_repos_finds_repos(tmpdir, baseapp): | |||
r1 | _stub_git_repo(tmpdir.ensure('repo', dir=True)) | |||
repos = list(utils.get_filesystem_repos(str(tmpdir))) | ||||
assert repos == [('repo', ('git', tmpdir.join('repo')))] | ||||
r2351 | def test_get_filesystem_repos_skips_directories(tmpdir, baseapp): | |||
r1 | tmpdir.ensure('not-a-repo', dir=True) | |||
repos = list(utils.get_filesystem_repos(str(tmpdir))) | ||||
assert repos == [] | ||||
r2351 | def test_get_filesystem_repos_skips_directories_with_repos(tmpdir, baseapp): | |||
r1 | _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True)) | |||
repos = list(utils.get_filesystem_repos(str(tmpdir))) | ||||
assert repos == [] | ||||
r2351 | def test_get_filesystem_repos_finds_repos_in_subdirectories(tmpdir, baseapp): | |||
r1 | _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True)) | |||
repos = list(utils.get_filesystem_repos(str(tmpdir), recursive=True)) | ||||
assert repos == [('subdir/repo', ('git', tmpdir.join('subdir', 'repo')))] | ||||
def test_get_filesystem_repos_skips_names_starting_with_dot(tmpdir): | ||||
_stub_git_repo(tmpdir.ensure('.repo', dir=True)) | ||||
repos = list(utils.get_filesystem_repos(str(tmpdir))) | ||||
assert repos == [] | ||||
def test_get_filesystem_repos_skips_files(tmpdir): | ||||
tmpdir.ensure('test-file') | ||||
repos = list(utils.get_filesystem_repos(str(tmpdir))) | ||||
assert repos == [] | ||||
def test_get_filesystem_repos_skips_removed_repositories(tmpdir): | ||||
removed_repo_name = 'rm__00000000_000000_000000__.stub' | ||||
assert utils.REMOVED_REPO_PAT.match(removed_repo_name) | ||||
_stub_git_repo(tmpdir.ensure(removed_repo_name, dir=True)) | ||||
repos = list(utils.get_filesystem_repos(str(tmpdir))) | ||||
assert repos == [] | ||||
def _stub_git_repo(repo_path): | ||||
""" | ||||
Make `repo_path` look like a Git repository. | ||||
""" | ||||
repo_path.ensure('.git', dir=True) | ||||
r5087 | def test_get_dirpaths_returns_all_paths_on_str(tmpdir): | |||
r1 | tmpdir.ensure('test-file') | |||
r5087 | tmpdir.ensure('test-file-1') | |||
tmp_path = str(tmpdir) | ||||
dirpaths = utils.get_dirpaths(tmp_path) | ||||
assert list(sorted(dirpaths)) == ['test-file', 'test-file-1'] | ||||
def test_get_dirpaths_returns_all_paths_on_bytes(tmpdir): | ||||
tmpdir.ensure('test-file-bytes') | ||||
tmp_path = str(tmpdir) | ||||
dirpaths = utils.get_dirpaths(safe_bytes(tmp_path)) | ||||
assert list(sorted(dirpaths)) == [b'test-file-bytes'] | ||||
r1 | ||||
def test_get_dirpaths_returns_all_paths_bytes( | ||||
tmpdir, platform_encodes_filenames): | ||||
if platform_encodes_filenames: | ||||
pytest.skip("This platform seems to encode filenames.") | ||||
tmpdir.ensure('repo-a-umlaut-\xe4') | ||||
r5087 | dirpaths = utils.get_dirpaths(str(tmpdir)) | |||
r1 | assert dirpaths == ['repo-a-umlaut-\xe4'] | |||
def test_get_dirpaths_skips_paths_it_cannot_decode( | ||||
tmpdir, platform_encodes_filenames): | ||||
if platform_encodes_filenames: | ||||
pytest.skip("This platform seems to encode filenames.") | ||||
path_with_latin1 = 'repo-a-umlaut-\xe4' | ||||
r5087 | tmp_path = str(tmpdir.ensure(path_with_latin1)) | |||
dirpaths = utils.get_dirpaths(tmp_path) | ||||
r1 | assert dirpaths == [] | |||
@pytest.fixture(scope='session') | ||||
def platform_encodes_filenames(): | ||||
""" | ||||
Boolean indicator if the current platform changes filename encodings. | ||||
""" | ||||
path_with_latin1 = 'repo-a-umlaut-\xe4' | ||||
tmpdir = py.path.local.mkdtemp() | ||||
tmpdir.ensure(path_with_latin1) | ||||
read_path = tmpdir.listdir()[0].basename | ||||
tmpdir.remove() | ||||
return path_with_latin1 != read_path | ||||
def test_repo2db_mapper_groups(repo_groups): | ||||
session = meta.Session() | ||||
zombie_group, parent_group, child_group = repo_groups | ||||
zombie_path = os.path.join( | ||||
RepoGroupModel().repos_path, zombie_group.full_path) | ||||
os.rmdir(zombie_path) | ||||
# Avoid removing test repos when calling repo2db_mapper | ||||
repo_list = { | ||||
repo.repo_name: 'test' for repo in session.query(db.Repository).all() | ||||
} | ||||
utils.repo2db_mapper(repo_list, remove_obsolete=True) | ||||
groups_in_db = session.query(db.RepoGroup).all() | ||||
assert child_group in groups_in_db | ||||
assert parent_group in groups_in_db | ||||
assert zombie_path not in groups_in_db | ||||
def test_repo2db_mapper_enables_largefiles(backend): | ||||
repo = backend.create_repo() | ||||
repo_list = {repo.repo_name: 'test'} | ||||
with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm_mock: | ||||
r2677 | utils.repo2db_mapper(repo_list, remove_obsolete=False) | |||
_, kwargs = scm_mock.call_args | ||||
assert kwargs['config'].get('extensions', 'largefiles') == '' | ||||
r1 | ||||
@pytest.mark.backends("git", "svn") | ||||
def test_repo2db_mapper_installs_hooks_for_repos_in_db(backend): | ||||
repo = backend.create_repo() | ||||
repo_list = {repo.repo_name: 'test'} | ||||
r2677 | utils.repo2db_mapper(repo_list, remove_obsolete=False) | |||
r1 | ||||
@pytest.mark.backends("git", "svn") | ||||
def test_repo2db_mapper_installs_hooks_for_newly_added_repos(backend): | ||||
repo = backend.create_repo() | ||||
RepoModel().delete(repo, fs_remove=False) | ||||
meta.Session().commit() | ||||
repo_list = {repo.repo_name: repo.scm_instance()} | ||||
r2677 | utils.repo2db_mapper(repo_list, remove_obsolete=False) | |||
r1 | ||||
class TestPasswordChanged(object): | ||||
r5087 | ||||
def setup_method(self): | ||||
r1 | self.session = { | |||
'rhodecode_user': { | ||||
'password': '0cc175b9c0f1b6a831c399e269772661' | ||||
} | ||||
} | ||||
self.auth_user = mock.Mock() | ||||
self.auth_user.userame = 'test' | ||||
self.auth_user.password = 'abc123' | ||||
def test_returns_false_for_default_user(self): | ||||
self.auth_user.username = db.User.DEFAULT_USER | ||||
result = utils.password_changed(self.auth_user, self.session) | ||||
assert result is False | ||||
def test_returns_false_if_password_was_not_changed(self): | ||||
r5087 | self.session['rhodecode_user']['password'] = md5_safe( | |||
r1 | self.auth_user.password) | |||
result = utils.password_changed(self.auth_user, self.session) | ||||
assert result is False | ||||
def test_returns_true_if_password_was_changed(self): | ||||
result = utils.password_changed(self.auth_user, self.session) | ||||
assert result is True | ||||
def test_returns_true_if_auth_user_password_is_empty(self): | ||||
self.auth_user.password = None | ||||
result = utils.password_changed(self.auth_user, self.session) | ||||
assert result is True | ||||
def test_returns_true_if_session_password_is_empty(self): | ||||
self.session['rhodecode_user'].pop('password') | ||||
result = utils.password_changed(self.auth_user, self.session) | ||||
assert result is True | ||||
r3073 | class TestReadOpenSourceLicenses(object): | |||
r1 | def test_success(self): | |||
utils._license_cache = None | ||||
json_data = ''' | ||||
{ | ||||
"python2.7-pytest-2.7.1": {"UNKNOWN": null}, | ||||
"python2.7-Markdown-2.6.2": { | ||||
"BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause" | ||||
} | ||||
} | ||||
''' | ||||
resource_string_patch = mock.patch.object( | ||||
utils.pkg_resources, 'resource_string', return_value=json_data) | ||||
with resource_string_patch: | ||||
result = utils.read_opensource_licenses() | ||||
assert result == json.loads(json_data) | ||||
def test_caching(self): | ||||
utils._license_cache = { | ||||
"python2.7-pytest-2.7.1": { | ||||
"UNKNOWN": None | ||||
}, | ||||
"python2.7-Markdown-2.6.2": { | ||||
"BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause" | ||||
} | ||||
} | ||||
resource_patch = mock.patch.object( | ||||
utils.pkg_resources, 'resource_string', side_effect=Exception) | ||||
json_patch = mock.patch.object( | ||||
utils.json, 'loads', side_effect=Exception) | ||||
with resource_patch as resource_mock, json_patch as json_mock: | ||||
result = utils.read_opensource_licenses() | ||||
assert resource_mock.call_count == 0 | ||||
assert json_mock.call_count == 0 | ||||
assert result == utils._license_cache | ||||
def test_licenses_file_contains_no_unknown_licenses(self): | ||||
utils._license_cache = None | ||||
result = utils.read_opensource_licenses() | ||||
r3073 | ||||
for license_data in result: | ||||
if isinstance(license_data["license"], list): | ||||
for lic_data in license_data["license"]: | ||||
assert 'UNKNOWN' not in lic_data["fullName"] | ||||
else: | ||||
full_name = license_data.get("fullName") or license_data | ||||
assert 'UNKNOWN' not in full_name | ||||
r1 | ||||
class TestMakeDbConfig(object): | ||||
def test_data_from_config_data_from_db_returned(self): | ||||
test_data = [ | ||||
('section1', 'option1', 'value1'), | ||||
('section2', 'option2', 'value2'), | ||||
('section3', 'option3', 'value3'), | ||||
] | ||||
with mock.patch.object(utils, 'config_data_from_db') as config_mock: | ||||
config_mock.return_value = test_data | ||||
kwargs = {'clear_session': False, 'repo': 'test_repo'} | ||||
result = utils.make_db_config(**kwargs) | ||||
config_mock.assert_called_once_with(**kwargs) | ||||
for section, option, expected_value in test_data: | ||||
value = result.get(section, option) | ||||
assert value == expected_value | ||||
class TestConfigDataFromDb(object): | ||||
def test_config_data_from_db_returns_active_settings(self): | ||||
test_data = [ | ||||
UiSetting('section1', 'option1', 'value1', True), | ||||
UiSetting('section2', 'option2', 'value2', True), | ||||
UiSetting('section3', 'option3', 'value3', False), | ||||
] | ||||
repo_name = 'test_repo' | ||||
r274 | model_patch = mock.patch.object(settings, 'VcsSettingsModel') | |||
r1 | hooks_patch = mock.patch.object( | |||
utils, 'get_enabled_hook_classes', | ||||
return_value=['pull', 'push', 'repo_size']) | ||||
with model_patch as model_mock, hooks_patch: | ||||
instance_mock = mock.Mock() | ||||
model_mock.return_value = instance_mock | ||||
instance_mock.get_ui_settings.return_value = test_data | ||||
result = utils.config_data_from_db( | ||||
clear_session=False, repo=repo_name) | ||||
self._assert_repo_name_passed(model_mock, repo_name) | ||||
expected_result = [ | ||||
('section1', 'option1', 'value1'), | ||||
('section2', 'option2', 'value2'), | ||||
] | ||||
assert result == expected_result | ||||
def _assert_repo_name_passed(self, model_mock, repo_name): | ||||
assert model_mock.call_count == 1 | ||||
call_args, call_kwargs = model_mock.call_args | ||||
assert call_kwargs['repo'] == repo_name | ||||
class TestIsDirWritable(object): | ||||
def test_returns_false_when_not_writable(self): | ||||
r5087 | with mock.patch('builtins.open', side_effect=OSError): | |||
r1 | assert not utils._is_dir_writable('/stub-path') | |||
def test_returns_true_when_writable(self, tmpdir): | ||||
assert utils._is_dir_writable(str(tmpdir)) | ||||
def test_is_safe_against_race_conditions(self, tmpdir): | ||||
workers = multiprocessing.Pool() | ||||
directories = [str(tmpdir)] * 10 | ||||
workers.map(utils._is_dir_writable, directories) | ||||
class TestGetEnabledHooks(object): | ||||
def test_only_active_hooks_are_enabled(self): | ||||
ui_settings = [ | ||||
UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True), | ||||
UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True), | ||||
UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', False) | ||||
] | ||||
result = utils.get_enabled_hook_classes(ui_settings) | ||||
assert result == ['push', 'repo_size'] | ||||
def test_all_hooks_are_enabled(self): | ||||
ui_settings = [ | ||||
UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True), | ||||
UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True), | ||||
UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', True) | ||||
] | ||||
result = utils.get_enabled_hook_classes(ui_settings) | ||||
assert result == ['push', 'repo_size', 'pull'] | ||||
def test_no_enabled_hooks_when_no_hook_settings_are_found(self): | ||||
ui_settings = [] | ||||
result = utils.get_enabled_hook_classes(ui_settings) | ||||
assert result == [] | ||||
r2359 | ||||
def test_obfuscate_url_pw(): | ||||
from rhodecode.lib.utils2 import obfuscate_url_pw | ||||
engine = u'/home/repos/malmö' | ||||
r4862 | assert obfuscate_url_pw(engine) | |||
@pytest.mark.parametrize("test_ua, expected", [ | ||||
("", ""), | ||||
('"quoted"', 'quoted'), | ||||
('internal-merge', 'internal-merge'), | ||||
('hg/internal-merge', 'hg/internal-merge'), | ||||
('git/internal-merge', 'git/internal-merge'), | ||||
# git | ||||
('git/2.10.1 (Apple Git-78)', 'git/2.10.1'), | ||||
('GiT/2.37.2.windows.2', 'git/2.37.2'), | ||||
('git/2.35.1 (Microsoft Windows NT 10.0.19044.0; Win32NT x64) CLR/4.0.30319 VS16/16.0.0', 'git/2.35.1'), | ||||
('ssh-user-agent', 'ssh-user-agent'), | ||||
('git/ssh-user-agent', 'git/ssh-user-agent'), | ||||
# hg | ||||
('mercurial/proto-1.0 (Mercurial 4.2)', 'mercurial/4.2'), | ||||
('mercurial/proto-1.0', ''), | ||||
('mercurial/proto-1.0 (Mercurial 3.9.2)', 'mercurial/3.9.2'), | ||||
('mercurial/ssh-user-agent', 'mercurial/ssh-user-agent'), | ||||
('mercurial/proto-1.0 (Mercurial 5.8rc0)', 'mercurial/5.8rc0'), | ||||
]) | ||||
def test_user_agent_normalizer(test_ua, expected): | ||||
from rhodecode.lib.utils2 import user_agent_normalizer | ||||
assert user_agent_normalizer(test_ua, safe=False) == expected | ||||