test_install_hooks.py
289 lines
| 10.3 KiB
| text/x-python
|
PythonLexer
r407 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1327 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
r407 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import os | ||||
import sys | ||||
import stat | ||||
import pytest | ||||
import vcsserver | ||||
import tempfile | ||||
from vcsserver import hook_utils | ||||
r1186 | from vcsserver.hook_utils import set_permissions_if_needed, HOOKS_DIR_MODE, HOOKS_FILE_MODE | |||
r407 | from vcsserver.tests.fixture import no_newline_id_generator | |||
r1249 | from vcsserver.lib.str_utils import safe_bytes | |||
r1060 | from vcsserver.utils import AttributeDict | |||
r407 | ||||
r1152 | class TestCheckRhodecodeHook: | |||
r407 | ||||
def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir): | ||||
hook = os.path.join(str(tmpdir), 'fake_hook_file.py') | ||||
with open(hook, 'wb') as f: | ||||
r1048 | f.write(b'dummy test') | |||
r407 | result = hook_utils.check_rhodecode_hook(hook) | |||
assert result is False | ||||
def test_returns_true_when_no_hook_file_found(self, tmpdir): | ||||
hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py') | ||||
result = hook_utils.check_rhodecode_hook(hook) | ||||
assert result | ||||
@pytest.mark.parametrize("file_content, expected_result", [ | ||||
("RC_HOOK_VER = '3.3.3'\n", True), | ||||
("RC_HOOK = '3.3.3'\n", False), | ||||
], ids=no_newline_id_generator) | ||||
def test_signatures(self, file_content, expected_result, tmpdir): | ||||
hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py') | ||||
with open(hook, 'wb') as f: | ||||
r1048 | f.write(safe_bytes(file_content)) | |||
r407 | ||||
result = hook_utils.check_rhodecode_hook(hook) | ||||
assert result is expected_result | ||||
r1152 | class BaseInstallHooks: | |||
r407 | HOOK_FILES = () | |||
r1186 | def _check_hook_file_dir_mode(self, file_path): | |||
dir_path = os.path.dirname(file_path) | ||||
assert os.path.exists(dir_path), f'dir {file_path} missing' | ||||
stat_info = os.stat(dir_path) | ||||
file_mode = stat.S_IMODE(stat_info.st_mode) | ||||
expected_mode = int(HOOKS_DIR_MODE) | ||||
assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {dir_path}' | ||||
r407 | def _check_hook_file_mode(self, file_path): | |||
r1153 | assert os.path.exists(file_path), f'path {file_path} missing' | |||
r407 | stat_info = os.stat(file_path) | |||
file_mode = stat.S_IMODE(stat_info.st_mode) | ||||
r1186 | expected_mode = int(HOOKS_FILE_MODE) | |||
assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {file_path}' | ||||
r407 | ||||
def _check_hook_file_content(self, file_path, executable): | ||||
executable = executable or sys.executable | ||||
with open(file_path, 'rt') as hook_file: | ||||
content = hook_file.read() | ||||
expected_env = '#!{}'.format(executable) | ||||
r1188 | expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.get_version()) | |||
r407 | assert content.strip().startswith(expected_env) | |||
assert expected_rc_version in content | ||||
def _create_fake_hook(self, file_path, content): | ||||
with open(file_path, 'w') as hook_file: | ||||
hook_file.write(content) | ||||
def create_dummy_repo(self, repo_type): | ||||
tmpdir = tempfile.mkdtemp() | ||||
repo = AttributeDict() | ||||
if repo_type == 'git': | ||||
repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo') | ||||
os.makedirs(repo.path) | ||||
os.makedirs(os.path.join(repo.path, 'hooks')) | ||||
repo.bare = True | ||||
elif repo_type == 'svn': | ||||
repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo') | ||||
os.makedirs(repo.path) | ||||
os.makedirs(os.path.join(repo.path, 'hooks')) | ||||
return repo | ||||
def check_hooks(self, repo_path, repo_bare=True): | ||||
for file_name in self.HOOK_FILES: | ||||
if repo_bare: | ||||
file_path = os.path.join(repo_path, 'hooks', file_name) | ||||
else: | ||||
file_path = os.path.join(repo_path, '.git', 'hooks', file_name) | ||||
r1186 | ||||
self._check_hook_file_dir_mode(file_path) | ||||
r407 | self._check_hook_file_mode(file_path) | |||
self._check_hook_file_content(file_path, sys.executable) | ||||
class TestInstallGitHooks(BaseInstallHooks): | ||||
HOOK_FILES = ('pre-receive', 'post-receive') | ||||
def test_hooks_are_installed(self): | ||||
repo = self.create_dummy_repo('git') | ||||
result = hook_utils.install_git_hooks(repo.path, repo.bare) | ||||
assert result | ||||
self.check_hooks(repo.path, repo.bare) | ||||
def test_hooks_are_replaced(self): | ||||
repo = self.create_dummy_repo('git') | ||||
hooks_path = os.path.join(repo.path, 'hooks') | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
self._create_fake_hook( | ||||
file_path, content="RC_HOOK_VER = 'abcde'\n") | ||||
result = hook_utils.install_git_hooks(repo.path, repo.bare) | ||||
assert result | ||||
self.check_hooks(repo.path, repo.bare) | ||||
def test_non_rc_hooks_are_not_replaced(self): | ||||
repo = self.create_dummy_repo('git') | ||||
hooks_path = os.path.join(repo.path, 'hooks') | ||||
non_rc_content = 'echo "non rc hook"\n' | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
self._create_fake_hook( | ||||
file_path, content=non_rc_content) | ||||
result = hook_utils.install_git_hooks(repo.path, repo.bare) | ||||
assert result | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
with open(file_path, 'rt') as hook_file: | ||||
content = hook_file.read() | ||||
assert content == non_rc_content | ||||
def test_non_rc_hooks_are_replaced_with_force_flag(self): | ||||
repo = self.create_dummy_repo('git') | ||||
hooks_path = os.path.join(repo.path, 'hooks') | ||||
non_rc_content = 'echo "non rc hook"\n' | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
self._create_fake_hook( | ||||
file_path, content=non_rc_content) | ||||
result = hook_utils.install_git_hooks( | ||||
repo.path, repo.bare, force_create=True) | ||||
assert result | ||||
self.check_hooks(repo.path, repo.bare) | ||||
class TestInstallSvnHooks(BaseInstallHooks): | ||||
HOOK_FILES = ('pre-commit', 'post-commit') | ||||
def test_hooks_are_installed(self): | ||||
repo = self.create_dummy_repo('svn') | ||||
result = hook_utils.install_svn_hooks(repo.path) | ||||
assert result | ||||
self.check_hooks(repo.path) | ||||
def test_hooks_are_replaced(self): | ||||
repo = self.create_dummy_repo('svn') | ||||
hooks_path = os.path.join(repo.path, 'hooks') | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
self._create_fake_hook( | ||||
file_path, content="RC_HOOK_VER = 'abcde'\n") | ||||
result = hook_utils.install_svn_hooks(repo.path) | ||||
assert result | ||||
self.check_hooks(repo.path) | ||||
def test_non_rc_hooks_are_not_replaced(self): | ||||
repo = self.create_dummy_repo('svn') | ||||
hooks_path = os.path.join(repo.path, 'hooks') | ||||
non_rc_content = 'echo "non rc hook"\n' | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
self._create_fake_hook( | ||||
file_path, content=non_rc_content) | ||||
result = hook_utils.install_svn_hooks(repo.path) | ||||
assert result | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
with open(file_path, 'rt') as hook_file: | ||||
content = hook_file.read() | ||||
assert content == non_rc_content | ||||
def test_non_rc_hooks_are_replaced_with_force_flag(self): | ||||
repo = self.create_dummy_repo('svn') | ||||
hooks_path = os.path.join(repo.path, 'hooks') | ||||
non_rc_content = 'echo "non rc hook"\n' | ||||
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | ||||
self._create_fake_hook( | ||||
file_path, content=non_rc_content) | ||||
result = hook_utils.install_svn_hooks( | ||||
repo.path, force_create=True) | ||||
assert result | ||||
self.check_hooks(repo.path, ) | ||||
r1186 | ||||
def create_test_file(filename): | ||||
"""Utility function to create a test file.""" | ||||
with open(filename, 'w') as f: | ||||
f.write("Test file") | ||||
def remove_test_file(filename): | ||||
"""Utility function to remove a test file.""" | ||||
if os.path.exists(filename): | ||||
os.remove(filename) | ||||
@pytest.fixture | ||||
def test_file(): | ||||
filename = 'test_file.txt' | ||||
create_test_file(filename) | ||||
yield filename | ||||
remove_test_file(filename) | ||||
def test_increase_permissions(test_file): | ||||
# Set initial lower permissions | ||||
initial_perms = 0o644 | ||||
os.chmod(test_file, initial_perms) | ||||
# Set higher permissions | ||||
new_perms = 0o666 | ||||
set_permissions_if_needed(test_file, new_perms) | ||||
# Check if permissions were updated | ||||
assert (os.stat(test_file).st_mode & 0o777) == new_perms | ||||
def test_no_permission_change_needed(test_file): | ||||
# Set initial permissions | ||||
initial_perms = 0o666 | ||||
os.chmod(test_file, initial_perms) | ||||
# Attempt to set the same permissions | ||||
set_permissions_if_needed(test_file, initial_perms) | ||||
# Check if permissions were unchanged | ||||
assert (os.stat(test_file).st_mode & 0o777) == initial_perms | ||||
def test_no_permission_reduction(test_file): | ||||
# Set initial higher permissions | ||||
initial_perms = 0o666 | ||||
os.chmod(test_file, initial_perms) | ||||
# Attempt to set lower permissions | ||||
lower_perms = 0o644 | ||||
set_permissions_if_needed(test_file, lower_perms) | ||||
# Check if permissions were not reduced | ||||
assert (os.stat(test_file).st_mode & 0o777) == initial_perms | ||||
def test_no_permission_reduction_when_on_777(test_file): | ||||
# Set initial higher permissions | ||||
initial_perms = 0o777 | ||||
os.chmod(test_file, initial_perms) | ||||
# Attempt to set lower permissions | ||||
lower_perms = 0o755 | ||||
set_permissions_if_needed(test_file, lower_perms) | ||||
# Check if permissions were not reduced | ||||
assert (os.stat(test_file).st_mode & 0o777) == initial_perms | ||||