|
|
# RhodeCode VCSServer provides access to different vcs backends via network.
|
|
|
# Copyright (C) 2014-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
|
|
|
|
|
import re
|
|
|
import os
|
|
|
import sys
|
|
|
import datetime
|
|
|
import logging
|
|
|
import pkg_resources
|
|
|
|
|
|
import vcsserver
|
|
|
import vcsserver.settings
|
|
|
from vcsserver.str_utils import safe_bytes
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
HOOKS_DIR_MODE = 0o755
|
|
|
HOOKS_FILE_MODE = 0o755
|
|
|
|
|
|
|
|
|
def set_permissions_if_needed(path_to_check, perms: oct):
|
|
|
# Get current permissions
|
|
|
current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits
|
|
|
|
|
|
# Check if current permissions are lower than required
|
|
|
if current_permissions < int(perms):
|
|
|
# Change the permissions if they are lower than required
|
|
|
os.chmod(path_to_check, perms)
|
|
|
|
|
|
|
|
|
def get_git_hooks_path(repo_path, bare):
|
|
|
hooks_path = os.path.join(repo_path, 'hooks')
|
|
|
if not bare:
|
|
|
hooks_path = os.path.join(repo_path, '.git', 'hooks')
|
|
|
|
|
|
return hooks_path
|
|
|
|
|
|
|
|
|
def install_git_hooks(repo_path, bare, executable=None, force_create=False):
|
|
|
"""
|
|
|
Creates a RhodeCode hook inside a git repository
|
|
|
|
|
|
:param repo_path: path to repository
|
|
|
:param bare: defines if repository is considered a bare git repo
|
|
|
:param executable: binary executable to put in the hooks
|
|
|
:param force_create: Creates even if the same name hook exists
|
|
|
"""
|
|
|
executable = executable or sys.executable
|
|
|
hooks_path = get_git_hooks_path(repo_path, bare)
|
|
|
|
|
|
# we always call it to ensure dir exists and it has a proper mode
|
|
|
if not os.path.exists(hooks_path):
|
|
|
# If it doesn't exist, create a new directory with the specified mode
|
|
|
os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True)
|
|
|
# If it exists, change the directory's mode to the specified mode
|
|
|
set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE)
|
|
|
|
|
|
tmpl_post = pkg_resources.resource_string(
|
|
|
'vcsserver', '/'.join(
|
|
|
('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl')))
|
|
|
tmpl_pre = pkg_resources.resource_string(
|
|
|
'vcsserver', '/'.join(
|
|
|
('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl')))
|
|
|
|
|
|
path = '' # not used for now
|
|
|
timestamp = datetime.datetime.utcnow().isoformat()
|
|
|
|
|
|
for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
|
|
|
log.debug('Installing git hook in repo %s', repo_path)
|
|
|
_hook_file = os.path.join(hooks_path, f'{h_type}-receive')
|
|
|
_rhodecode_hook = check_rhodecode_hook(_hook_file)
|
|
|
|
|
|
if _rhodecode_hook or force_create:
|
|
|
log.debug('writing git %s hook file at %s !', h_type, _hook_file)
|
|
|
try:
|
|
|
with open(_hook_file, 'wb') as f:
|
|
|
template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version()))
|
|
|
template = template.replace(b'_DATE_', safe_bytes(timestamp))
|
|
|
template = template.replace(b'_ENV_', safe_bytes(executable))
|
|
|
template = template.replace(b'_PATH_', safe_bytes(path))
|
|
|
f.write(template)
|
|
|
set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE)
|
|
|
except OSError:
|
|
|
log.exception('error writing hook file %s', _hook_file)
|
|
|
else:
|
|
|
log.debug('skipping writing hook file')
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def get_svn_hooks_path(repo_path):
|
|
|
hooks_path = os.path.join(repo_path, 'hooks')
|
|
|
|
|
|
return hooks_path
|
|
|
|
|
|
|
|
|
def install_svn_hooks(repo_path, executable=None, force_create=False):
|
|
|
"""
|
|
|
Creates RhodeCode hooks inside a svn repository
|
|
|
|
|
|
:param repo_path: path to repository
|
|
|
:param executable: binary executable to put in the hooks
|
|
|
:param force_create: Create even if same name hook exists
|
|
|
"""
|
|
|
executable = executable or sys.executable
|
|
|
hooks_path = get_svn_hooks_path(repo_path)
|
|
|
if not os.path.isdir(hooks_path):
|
|
|
os.makedirs(hooks_path, mode=0o777, exist_ok=True)
|
|
|
|
|
|
tmpl_post = pkg_resources.resource_string(
|
|
|
'vcsserver', '/'.join(
|
|
|
('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl')))
|
|
|
tmpl_pre = pkg_resources.resource_string(
|
|
|
'vcsserver', '/'.join(
|
|
|
('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl')))
|
|
|
|
|
|
path = '' # not used for now
|
|
|
timestamp = datetime.datetime.utcnow().isoformat()
|
|
|
|
|
|
for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
|
|
|
log.debug('Installing svn hook in repo %s', repo_path)
|
|
|
_hook_file = os.path.join(hooks_path, f'{h_type}-commit')
|
|
|
_rhodecode_hook = check_rhodecode_hook(_hook_file)
|
|
|
|
|
|
if _rhodecode_hook or force_create:
|
|
|
log.debug('writing svn %s hook file at %s !', h_type, _hook_file)
|
|
|
|
|
|
env_expand = str([
|
|
|
('RC_CORE_BINARY_DIR', vcsserver.settings.BINARY_DIR),
|
|
|
('RC_GIT_EXECUTABLE', vcsserver.settings.GIT_EXECUTABLE()),
|
|
|
('RC_SVN_EXECUTABLE', vcsserver.settings.SVN_EXECUTABLE()),
|
|
|
('RC_SVNLOOK_EXECUTABLE', vcsserver.settings.SVNLOOK_EXECUTABLE()),
|
|
|
|
|
|
])
|
|
|
try:
|
|
|
with open(_hook_file, 'wb') as f:
|
|
|
template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version()))
|
|
|
template = template.replace(b'_DATE_', safe_bytes(timestamp))
|
|
|
template = template.replace(b'_OS_EXPAND_', safe_bytes(env_expand))
|
|
|
template = template.replace(b'_ENV_', safe_bytes(executable))
|
|
|
template = template.replace(b'_PATH_', safe_bytes(path))
|
|
|
|
|
|
f.write(template)
|
|
|
os.chmod(_hook_file, 0o755)
|
|
|
except OSError:
|
|
|
log.exception('error writing hook file %s', _hook_file)
|
|
|
else:
|
|
|
log.debug('skipping writing hook file')
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def get_version_from_hook(hook_path):
|
|
|
version = b''
|
|
|
hook_content = read_hook_content(hook_path)
|
|
|
matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content)
|
|
|
if matches:
|
|
|
try:
|
|
|
version = matches.groups()[0]
|
|
|
log.debug('got version %s from hooks.', version)
|
|
|
except Exception:
|
|
|
log.exception("Exception while reading the hook version.")
|
|
|
return version.replace(b"'", b"")
|
|
|
|
|
|
|
|
|
def check_rhodecode_hook(hook_path):
|
|
|
"""
|
|
|
Check if the hook was created by RhodeCode
|
|
|
"""
|
|
|
if not os.path.exists(hook_path):
|
|
|
return True
|
|
|
|
|
|
log.debug('hook exists, checking if it is from RhodeCode')
|
|
|
|
|
|
version = get_version_from_hook(hook_path)
|
|
|
if version:
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
def read_hook_content(hook_path) -> bytes:
|
|
|
content = b''
|
|
|
if os.path.isfile(hook_path):
|
|
|
with open(hook_path, 'rb') as f:
|
|
|
content = f.read()
|
|
|
return content
|
|
|
|
|
|
|
|
|
def get_git_pre_hook_version(repo_path, bare):
|
|
|
hooks_path = get_git_hooks_path(repo_path, bare)
|
|
|
_hook_file = os.path.join(hooks_path, 'pre-receive')
|
|
|
version = get_version_from_hook(_hook_file)
|
|
|
return version
|
|
|
|
|
|
|
|
|
def get_git_post_hook_version(repo_path, bare):
|
|
|
hooks_path = get_git_hooks_path(repo_path, bare)
|
|
|
_hook_file = os.path.join(hooks_path, 'post-receive')
|
|
|
version = get_version_from_hook(_hook_file)
|
|
|
return version
|
|
|
|
|
|
|
|
|
def get_svn_pre_hook_version(repo_path):
|
|
|
hooks_path = get_svn_hooks_path(repo_path)
|
|
|
_hook_file = os.path.join(hooks_path, 'pre-commit')
|
|
|
version = get_version_from_hook(_hook_file)
|
|
|
return version
|
|
|
|
|
|
|
|
|
def get_svn_post_hook_version(repo_path):
|
|
|
hooks_path = get_svn_hooks_path(repo_path)
|
|
|
_hook_file = os.path.join(hooks_path, 'post-commit')
|
|
|
version = get_version_from_hook(_hook_file)
|
|
|
return version
|
|
|
|