# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2023 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import re import os import sys import datetime import logging import pkg_resources import vcsserver import vcsserver.settings from vcsserver.lib.str_utils import safe_bytes log = logging.getLogger(__name__) HOOKS_DIR_MODE = 0o755 HOOKS_FILE_MODE = 0o755 def set_permissions_if_needed(path_to_check, perms: oct): # Get current permissions current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits # Check if current permissions are lower than required if current_permissions < int(perms): # Change the permissions if they are lower than required os.chmod(path_to_check, perms) def get_git_hooks_path(repo_path, bare): hooks_path = os.path.join(repo_path, 'hooks') if not bare: hooks_path = os.path.join(repo_path, '.git', 'hooks') return hooks_path def install_git_hooks(repo_path, bare, executable=None, force_create=False): """ Creates a RhodeCode hook inside a git repository :param repo_path: path to repository :param bare: defines if repository is considered a bare git repo :param executable: binary executable to put in the hooks :param force_create: Creates even if the same name hook exists """ executable = executable or sys.executable hooks_path = get_git_hooks_path(repo_path, bare) # we always call it to ensure dir exists and it has a proper mode if not os.path.exists(hooks_path): # If it doesn't exist, create a new directory with the specified mode os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True) # If it exists, change the directory's mode to the specified mode set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE) tmpl_post = pkg_resources.resource_string( 'vcsserver', '/'.join( ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl'))) tmpl_pre = pkg_resources.resource_string( 'vcsserver', '/'.join( ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl'))) path = '' # not used for now timestamp = datetime.datetime.utcnow().isoformat() for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: log.debug('Installing git hook in repo %s', repo_path) _hook_file = os.path.join(hooks_path, f'{h_type}-receive') _rhodecode_hook = check_rhodecode_hook(_hook_file) if _rhodecode_hook or force_create: log.debug('writing git %s hook file at %s !', h_type, _hook_file) try: with open(_hook_file, 'wb') as f: template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version())) template = template.replace(b'_DATE_', safe_bytes(timestamp)) template = template.replace(b'_ENV_', safe_bytes(executable)) template = template.replace(b'_PATH_', safe_bytes(path)) f.write(template) set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE) except OSError: log.exception('error writing hook file %s', _hook_file) else: log.debug('skipping writing hook file') return True def get_svn_hooks_path(repo_path): hooks_path = os.path.join(repo_path, 'hooks') return hooks_path def install_svn_hooks(repo_path, executable=None, force_create=False): """ Creates RhodeCode hooks inside a svn repository :param repo_path: path to repository :param executable: binary executable to put in the hooks :param force_create: Create even if same name hook exists """ executable = executable or sys.executable hooks_path = get_svn_hooks_path(repo_path) if not os.path.isdir(hooks_path): os.makedirs(hooks_path, mode=0o777, exist_ok=True) tmpl_post = pkg_resources.resource_string( 'vcsserver', '/'.join( ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl'))) tmpl_pre = pkg_resources.resource_string( 'vcsserver', '/'.join( ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl'))) path = '' # not used for now timestamp = datetime.datetime.utcnow().isoformat() for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: log.debug('Installing svn hook in repo %s', repo_path) _hook_file = os.path.join(hooks_path, f'{h_type}-commit') _rhodecode_hook = check_rhodecode_hook(_hook_file) if _rhodecode_hook or force_create: log.debug('writing svn %s hook file at %s !', h_type, _hook_file) env_expand = str([ ('RC_CORE_BINARY_DIR', vcsserver.settings.BINARY_DIR), ('RC_GIT_EXECUTABLE', vcsserver.settings.GIT_EXECUTABLE()), ('RC_SVN_EXECUTABLE', vcsserver.settings.SVN_EXECUTABLE()), ('RC_SVNLOOK_EXECUTABLE', vcsserver.settings.SVNLOOK_EXECUTABLE()), ]) try: with open(_hook_file, 'wb') as f: template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version())) template = template.replace(b'_DATE_', safe_bytes(timestamp)) template = template.replace(b'_OS_EXPAND_', safe_bytes(env_expand)) template = template.replace(b'_ENV_', safe_bytes(executable)) template = template.replace(b'_PATH_', safe_bytes(path)) f.write(template) os.chmod(_hook_file, 0o755) except OSError: log.exception('error writing hook file %s', _hook_file) else: log.debug('skipping writing hook file') return True def get_version_from_hook(hook_path): version = b'' hook_content = read_hook_content(hook_path) matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content) if matches: try: version = matches.groups()[0] log.debug('got version %s from hooks.', version) except Exception: log.exception("Exception while reading the hook version.") return version.replace(b"'", b"") def check_rhodecode_hook(hook_path): """ Check if the hook was created by RhodeCode """ if not os.path.exists(hook_path): return True log.debug('hook exists, checking if it is from RhodeCode') version = get_version_from_hook(hook_path) if version: return True return False def read_hook_content(hook_path) -> bytes: content = b'' if os.path.isfile(hook_path): with open(hook_path, 'rb') as f: content = f.read() return content def get_git_pre_hook_version(repo_path, bare): hooks_path = get_git_hooks_path(repo_path, bare) _hook_file = os.path.join(hooks_path, 'pre-receive') version = get_version_from_hook(_hook_file) return version def get_git_post_hook_version(repo_path, bare): hooks_path = get_git_hooks_path(repo_path, bare) _hook_file = os.path.join(hooks_path, 'post-receive') version = get_version_from_hook(_hook_file) return version def get_svn_pre_hook_version(repo_path): hooks_path = get_svn_hooks_path(repo_path) _hook_file = os.path.join(hooks_path, 'pre-commit') version = get_version_from_hook(_hook_file) return version def get_svn_post_hook_version(repo_path): hooks_path = get_svn_hooks_path(repo_path) _hook_file = os.path.join(hooks_path, 'post-commit') version = get_version_from_hook(_hook_file) return version