|
|
# RhodeCode VCSServer provides access to different vcs backends via network.
|
|
|
# Copyright (C) 2014-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import stat
|
|
|
import pytest
|
|
|
import vcsserver
|
|
|
import tempfile
|
|
|
from vcsserver import hook_utils
|
|
|
from vcsserver.hook_utils import set_permissions_if_needed, HOOKS_DIR_MODE, HOOKS_FILE_MODE
|
|
|
from vcsserver.tests.fixture import no_newline_id_generator
|
|
|
from vcsserver.str_utils import safe_bytes
|
|
|
from vcsserver.utils import AttributeDict
|
|
|
|
|
|
|
|
|
class TestCheckRhodecodeHook:
|
|
|
|
|
|
def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir):
|
|
|
hook = os.path.join(str(tmpdir), 'fake_hook_file.py')
|
|
|
with open(hook, 'wb') as f:
|
|
|
f.write(b'dummy test')
|
|
|
result = hook_utils.check_rhodecode_hook(hook)
|
|
|
assert result is False
|
|
|
|
|
|
def test_returns_true_when_no_hook_file_found(self, tmpdir):
|
|
|
hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py')
|
|
|
result = hook_utils.check_rhodecode_hook(hook)
|
|
|
assert result
|
|
|
|
|
|
@pytest.mark.parametrize("file_content, expected_result", [
|
|
|
("RC_HOOK_VER = '3.3.3'\n", True),
|
|
|
("RC_HOOK = '3.3.3'\n", False),
|
|
|
], ids=no_newline_id_generator)
|
|
|
def test_signatures(self, file_content, expected_result, tmpdir):
|
|
|
hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py')
|
|
|
with open(hook, 'wb') as f:
|
|
|
f.write(safe_bytes(file_content))
|
|
|
|
|
|
result = hook_utils.check_rhodecode_hook(hook)
|
|
|
|
|
|
assert result is expected_result
|
|
|
|
|
|
|
|
|
class BaseInstallHooks:
|
|
|
HOOK_FILES = ()
|
|
|
|
|
|
def _check_hook_file_dir_mode(self, file_path):
|
|
|
dir_path = os.path.dirname(file_path)
|
|
|
assert os.path.exists(dir_path), f'dir {file_path} missing'
|
|
|
stat_info = os.stat(dir_path)
|
|
|
|
|
|
file_mode = stat.S_IMODE(stat_info.st_mode)
|
|
|
expected_mode = int(HOOKS_DIR_MODE)
|
|
|
assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {dir_path}'
|
|
|
|
|
|
def _check_hook_file_mode(self, file_path):
|
|
|
assert os.path.exists(file_path), f'path {file_path} missing'
|
|
|
stat_info = os.stat(file_path)
|
|
|
|
|
|
file_mode = stat.S_IMODE(stat_info.st_mode)
|
|
|
expected_mode = int(HOOKS_FILE_MODE)
|
|
|
assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {file_path}'
|
|
|
|
|
|
def _check_hook_file_content(self, file_path, executable):
|
|
|
executable = executable or sys.executable
|
|
|
with open(file_path, 'rt') as hook_file:
|
|
|
content = hook_file.read()
|
|
|
|
|
|
expected_env = '#!{}'.format(executable)
|
|
|
expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.get_version())
|
|
|
assert content.strip().startswith(expected_env)
|
|
|
assert expected_rc_version in content
|
|
|
|
|
|
def _create_fake_hook(self, file_path, content):
|
|
|
with open(file_path, 'w') as hook_file:
|
|
|
hook_file.write(content)
|
|
|
|
|
|
def create_dummy_repo(self, repo_type):
|
|
|
tmpdir = tempfile.mkdtemp()
|
|
|
repo = AttributeDict()
|
|
|
if repo_type == 'git':
|
|
|
repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo')
|
|
|
os.makedirs(repo.path)
|
|
|
os.makedirs(os.path.join(repo.path, 'hooks'))
|
|
|
repo.bare = True
|
|
|
|
|
|
elif repo_type == 'svn':
|
|
|
repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo')
|
|
|
os.makedirs(repo.path)
|
|
|
os.makedirs(os.path.join(repo.path, 'hooks'))
|
|
|
|
|
|
return repo
|
|
|
|
|
|
def check_hooks(self, repo_path, repo_bare=True):
|
|
|
for file_name in self.HOOK_FILES:
|
|
|
if repo_bare:
|
|
|
file_path = os.path.join(repo_path, 'hooks', file_name)
|
|
|
else:
|
|
|
file_path = os.path.join(repo_path, '.git', 'hooks', file_name)
|
|
|
|
|
|
self._check_hook_file_dir_mode(file_path)
|
|
|
self._check_hook_file_mode(file_path)
|
|
|
self._check_hook_file_content(file_path, sys.executable)
|
|
|
|
|
|
|
|
|
class TestInstallGitHooks(BaseInstallHooks):
|
|
|
HOOK_FILES = ('pre-receive', 'post-receive')
|
|
|
|
|
|
def test_hooks_are_installed(self):
|
|
|
repo = self.create_dummy_repo('git')
|
|
|
result = hook_utils.install_git_hooks(repo.path, repo.bare)
|
|
|
assert result
|
|
|
self.check_hooks(repo.path, repo.bare)
|
|
|
|
|
|
def test_hooks_are_replaced(self):
|
|
|
repo = self.create_dummy_repo('git')
|
|
|
hooks_path = os.path.join(repo.path, 'hooks')
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
self._create_fake_hook(
|
|
|
file_path, content="RC_HOOK_VER = 'abcde'\n")
|
|
|
|
|
|
result = hook_utils.install_git_hooks(repo.path, repo.bare)
|
|
|
assert result
|
|
|
self.check_hooks(repo.path, repo.bare)
|
|
|
|
|
|
def test_non_rc_hooks_are_not_replaced(self):
|
|
|
repo = self.create_dummy_repo('git')
|
|
|
hooks_path = os.path.join(repo.path, 'hooks')
|
|
|
non_rc_content = 'echo "non rc hook"\n'
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
self._create_fake_hook(
|
|
|
file_path, content=non_rc_content)
|
|
|
|
|
|
result = hook_utils.install_git_hooks(repo.path, repo.bare)
|
|
|
assert result
|
|
|
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
with open(file_path, 'rt') as hook_file:
|
|
|
content = hook_file.read()
|
|
|
assert content == non_rc_content
|
|
|
|
|
|
def test_non_rc_hooks_are_replaced_with_force_flag(self):
|
|
|
repo = self.create_dummy_repo('git')
|
|
|
hooks_path = os.path.join(repo.path, 'hooks')
|
|
|
non_rc_content = 'echo "non rc hook"\n'
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
self._create_fake_hook(
|
|
|
file_path, content=non_rc_content)
|
|
|
|
|
|
result = hook_utils.install_git_hooks(
|
|
|
repo.path, repo.bare, force_create=True)
|
|
|
assert result
|
|
|
self.check_hooks(repo.path, repo.bare)
|
|
|
|
|
|
|
|
|
class TestInstallSvnHooks(BaseInstallHooks):
|
|
|
HOOK_FILES = ('pre-commit', 'post-commit')
|
|
|
|
|
|
def test_hooks_are_installed(self):
|
|
|
repo = self.create_dummy_repo('svn')
|
|
|
result = hook_utils.install_svn_hooks(repo.path)
|
|
|
assert result
|
|
|
self.check_hooks(repo.path)
|
|
|
|
|
|
def test_hooks_are_replaced(self):
|
|
|
repo = self.create_dummy_repo('svn')
|
|
|
hooks_path = os.path.join(repo.path, 'hooks')
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
self._create_fake_hook(
|
|
|
file_path, content="RC_HOOK_VER = 'abcde'\n")
|
|
|
|
|
|
result = hook_utils.install_svn_hooks(repo.path)
|
|
|
assert result
|
|
|
self.check_hooks(repo.path)
|
|
|
|
|
|
def test_non_rc_hooks_are_not_replaced(self):
|
|
|
repo = self.create_dummy_repo('svn')
|
|
|
hooks_path = os.path.join(repo.path, 'hooks')
|
|
|
non_rc_content = 'echo "non rc hook"\n'
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
self._create_fake_hook(
|
|
|
file_path, content=non_rc_content)
|
|
|
|
|
|
result = hook_utils.install_svn_hooks(repo.path)
|
|
|
assert result
|
|
|
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
with open(file_path, 'rt') as hook_file:
|
|
|
content = hook_file.read()
|
|
|
assert content == non_rc_content
|
|
|
|
|
|
def test_non_rc_hooks_are_replaced_with_force_flag(self):
|
|
|
repo = self.create_dummy_repo('svn')
|
|
|
hooks_path = os.path.join(repo.path, 'hooks')
|
|
|
non_rc_content = 'echo "non rc hook"\n'
|
|
|
for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
|
|
|
self._create_fake_hook(
|
|
|
file_path, content=non_rc_content)
|
|
|
|
|
|
result = hook_utils.install_svn_hooks(
|
|
|
repo.path, force_create=True)
|
|
|
assert result
|
|
|
self.check_hooks(repo.path, )
|
|
|
|
|
|
|
|
|
def create_test_file(filename):
|
|
|
"""Utility function to create a test file."""
|
|
|
with open(filename, 'w') as f:
|
|
|
f.write("Test file")
|
|
|
|
|
|
|
|
|
def remove_test_file(filename):
|
|
|
"""Utility function to remove a test file."""
|
|
|
if os.path.exists(filename):
|
|
|
os.remove(filename)
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
def test_file():
|
|
|
filename = 'test_file.txt'
|
|
|
create_test_file(filename)
|
|
|
yield filename
|
|
|
remove_test_file(filename)
|
|
|
|
|
|
|
|
|
def test_increase_permissions(test_file):
|
|
|
# Set initial lower permissions
|
|
|
initial_perms = 0o644
|
|
|
os.chmod(test_file, initial_perms)
|
|
|
|
|
|
# Set higher permissions
|
|
|
new_perms = 0o666
|
|
|
set_permissions_if_needed(test_file, new_perms)
|
|
|
|
|
|
# Check if permissions were updated
|
|
|
assert (os.stat(test_file).st_mode & 0o777) == new_perms
|
|
|
|
|
|
|
|
|
def test_no_permission_change_needed(test_file):
|
|
|
# Set initial permissions
|
|
|
initial_perms = 0o666
|
|
|
os.chmod(test_file, initial_perms)
|
|
|
|
|
|
# Attempt to set the same permissions
|
|
|
set_permissions_if_needed(test_file, initial_perms)
|
|
|
|
|
|
# Check if permissions were unchanged
|
|
|
assert (os.stat(test_file).st_mode & 0o777) == initial_perms
|
|
|
|
|
|
|
|
|
def test_no_permission_reduction(test_file):
|
|
|
# Set initial higher permissions
|
|
|
initial_perms = 0o666
|
|
|
os.chmod(test_file, initial_perms)
|
|
|
|
|
|
# Attempt to set lower permissions
|
|
|
lower_perms = 0o644
|
|
|
set_permissions_if_needed(test_file, lower_perms)
|
|
|
|
|
|
# Check if permissions were not reduced
|
|
|
assert (os.stat(test_file).st_mode & 0o777) == initial_perms
|
|
|
|
|
|
|
|
|
def test_no_permission_reduction_when_on_777(test_file):
|
|
|
# Set initial higher permissions
|
|
|
initial_perms = 0o777
|
|
|
os.chmod(test_file, initial_perms)
|
|
|
|
|
|
# Attempt to set lower permissions
|
|
|
lower_perms = 0o755
|
|
|
set_permissions_if_needed(test_file, lower_perms)
|
|
|
|
|
|
# Check if permissions were not reduced
|
|
|
assert (os.stat(test_file).st_mode & 0o777) == initial_perms
|
|
|
|