__init__.py
230 lines
| 8.2 KiB
| text/x-python
|
PythonLexer
r407 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1126 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r407 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
r1214 | ||||
r407 | import re | |||
import os | ||||
import sys | ||||
import datetime | ||||
import logging | ||||
import pkg_resources | ||||
import vcsserver | ||||
r1214 | import vcsserver.settings | |||
r1249 | from vcsserver.lib.str_utils import safe_bytes | |||
r407 | ||||
log = logging.getLogger(__name__) | ||||
r1186 | HOOKS_DIR_MODE = 0o755 | |||
HOOKS_FILE_MODE = 0o755 | ||||
def set_permissions_if_needed(path_to_check, perms: oct): | ||||
# Get current permissions | ||||
current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits | ||||
# Check if current permissions are lower than required | ||||
if current_permissions < int(perms): | ||||
# Change the permissions if they are lower than required | ||||
os.chmod(path_to_check, perms) | ||||
r407 | ||||
r623 | def get_git_hooks_path(repo_path, bare): | |||
hooks_path = os.path.join(repo_path, 'hooks') | ||||
if not bare: | ||||
hooks_path = os.path.join(repo_path, '.git', 'hooks') | ||||
return hooks_path | ||||
r407 | def install_git_hooks(repo_path, bare, executable=None, force_create=False): | |||
""" | ||||
Creates a RhodeCode hook inside a git repository | ||||
:param repo_path: path to repository | ||||
r1179 | :param bare: defines if repository is considered a bare git repo | |||
r407 | :param executable: binary executable to put in the hooks | |||
r1179 | :param force_create: Creates even if the same name hook exists | |||
r407 | """ | |||
executable = executable or sys.executable | ||||
r623 | hooks_path = get_git_hooks_path(repo_path, bare) | |||
r1179 | # we always call it to ensure dir exists and it has a proper mode | |||
if not os.path.exists(hooks_path): | ||||
# If it doesn't exist, create a new directory with the specified mode | ||||
r1186 | os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True) | |||
# If it exists, change the directory's mode to the specified mode | ||||
set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE) | ||||
r407 | ||||
tmpl_post = pkg_resources.resource_string( | ||||
'vcsserver', '/'.join( | ||||
('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl'))) | ||||
tmpl_pre = pkg_resources.resource_string( | ||||
'vcsserver', '/'.join( | ||||
('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl'))) | ||||
path = '' # not used for now | ||||
timestamp = datetime.datetime.utcnow().isoformat() | ||||
for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: | ||||
log.debug('Installing git hook in repo %s', repo_path) | ||||
r1152 | _hook_file = os.path.join(hooks_path, f'{h_type}-receive') | |||
r407 | _rhodecode_hook = check_rhodecode_hook(_hook_file) | |||
if _rhodecode_hook or force_create: | ||||
log.debug('writing git %s hook file at %s !', h_type, _hook_file) | ||||
try: | ||||
with open(_hook_file, 'wb') as f: | ||||
r1188 | template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version())) | |||
r1048 | template = template.replace(b'_DATE_', safe_bytes(timestamp)) | |||
template = template.replace(b'_ENV_', safe_bytes(executable)) | ||||
template = template.replace(b'_PATH_', safe_bytes(path)) | ||||
r407 | f.write(template) | |||
r1186 | set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE) | |||
r1130 | except OSError: | |||
r407 | log.exception('error writing hook file %s', _hook_file) | |||
else: | ||||
log.debug('skipping writing hook file') | ||||
return True | ||||
r623 | def get_svn_hooks_path(repo_path): | |||
hooks_path = os.path.join(repo_path, 'hooks') | ||||
return hooks_path | ||||
r407 | def install_svn_hooks(repo_path, executable=None, force_create=False): | |||
""" | ||||
Creates RhodeCode hooks inside a svn repository | ||||
:param repo_path: path to repository | ||||
:param executable: binary executable to put in the hooks | ||||
:param force_create: Create even if same name hook exists | ||||
""" | ||||
executable = executable or sys.executable | ||||
r623 | hooks_path = get_svn_hooks_path(repo_path) | |||
r407 | if not os.path.isdir(hooks_path): | |||
r1120 | os.makedirs(hooks_path, mode=0o777, exist_ok=True) | |||
r407 | ||||
tmpl_post = pkg_resources.resource_string( | ||||
'vcsserver', '/'.join( | ||||
('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl'))) | ||||
tmpl_pre = pkg_resources.resource_string( | ||||
'vcsserver', '/'.join( | ||||
('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl'))) | ||||
path = '' # not used for now | ||||
timestamp = datetime.datetime.utcnow().isoformat() | ||||
for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: | ||||
log.debug('Installing svn hook in repo %s', repo_path) | ||||
r1152 | _hook_file = os.path.join(hooks_path, f'{h_type}-commit') | |||
r407 | _rhodecode_hook = check_rhodecode_hook(_hook_file) | |||
if _rhodecode_hook or force_create: | ||||
log.debug('writing svn %s hook file at %s !', h_type, _hook_file) | ||||
r1214 | env_expand = str([ | |||
('RC_CORE_BINARY_DIR', vcsserver.settings.BINARY_DIR), | ||||
r1230 | ('RC_GIT_EXECUTABLE', vcsserver.settings.GIT_EXECUTABLE()), | |||
('RC_SVN_EXECUTABLE', vcsserver.settings.SVN_EXECUTABLE()), | ||||
('RC_SVNLOOK_EXECUTABLE', vcsserver.settings.SVNLOOK_EXECUTABLE()), | ||||
r1214 | ||||
]) | ||||
r407 | try: | |||
with open(_hook_file, 'wb') as f: | ||||
r1188 | template = template.replace(b'_TMPL_', safe_bytes(vcsserver.get_version())) | |||
r1048 | template = template.replace(b'_DATE_', safe_bytes(timestamp)) | |||
r1214 | template = template.replace(b'_OS_EXPAND_', safe_bytes(env_expand)) | |||
r1048 | template = template.replace(b'_ENV_', safe_bytes(executable)) | |||
template = template.replace(b'_PATH_', safe_bytes(path)) | ||||
r407 | ||||
f.write(template) | ||||
r590 | os.chmod(_hook_file, 0o755) | |||
r1130 | except OSError: | |||
r407 | log.exception('error writing hook file %s', _hook_file) | |||
else: | ||||
log.debug('skipping writing hook file') | ||||
return True | ||||
r623 | def get_version_from_hook(hook_path): | |||
r1048 | version = b'' | |||
r623 | hook_content = read_hook_content(hook_path) | |||
r1120 | matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content) | |||
r623 | if matches: | |||
try: | ||||
version = matches.groups()[0] | ||||
log.debug('got version %s from hooks.', version) | ||||
except Exception: | ||||
log.exception("Exception while reading the hook version.") | ||||
r1048 | return version.replace(b"'", b"") | |||
r623 | ||||
r407 | def check_rhodecode_hook(hook_path): | |||
""" | ||||
Check if the hook was created by RhodeCode | ||||
""" | ||||
if not os.path.exists(hook_path): | ||||
return True | ||||
r623 | log.debug('hook exists, checking if it is from RhodeCode') | |||
version = get_version_from_hook(hook_path) | ||||
if version: | ||||
return True | ||||
r407 | ||||
return False | ||||
r1066 | def read_hook_content(hook_path) -> bytes: | |||
content = b'' | ||||
r691 | if os.path.isfile(hook_path): | |||
with open(hook_path, 'rb') as f: | ||||
content = f.read() | ||||
r407 | return content | |||
r623 | ||||
def get_git_pre_hook_version(repo_path, bare): | ||||
hooks_path = get_git_hooks_path(repo_path, bare) | ||||
_hook_file = os.path.join(hooks_path, 'pre-receive') | ||||
version = get_version_from_hook(_hook_file) | ||||
return version | ||||
def get_git_post_hook_version(repo_path, bare): | ||||
hooks_path = get_git_hooks_path(repo_path, bare) | ||||
_hook_file = os.path.join(hooks_path, 'post-receive') | ||||
version = get_version_from_hook(_hook_file) | ||||
return version | ||||
def get_svn_pre_hook_version(repo_path): | ||||
hooks_path = get_svn_hooks_path(repo_path) | ||||
_hook_file = os.path.join(hooks_path, 'pre-commit') | ||||
version = get_version_from_hook(_hook_file) | ||||
return version | ||||
def get_svn_post_hook_version(repo_path): | ||||
hooks_path = get_svn_hooks_path(repo_path) | ||||
_hook_file = os.path.join(hooks_path, 'post-commit') | ||||
version = get_version_from_hook(_hook_file) | ||||
return version | ||||