##// END OF EJS Templates
code: fixed typos
code: fixed typos

File last commit:

r1326:f4bef470 default
r1326:f4bef470 default
Show More
hooks.py
798 lines | 24.4 KiB | text/x-python | PythonLexer
# RhodeCode VCSServer provides access to different vcs backends via network.
# Copyright (C) 2014-2024 RhodeCode GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import io
import os
import sys
import logging
import collections
import base64
import msgpack
import dataclasses
import pygit2
import http.client
from celery import Celery
import mercurial.scmutil
import mercurial.node
from vcsserver import exceptions, subprocessio, settings
from vcsserver.lib.ext_json import json
from vcsserver.lib.str_utils import ascii_str, safe_str
from vcsserver.lib.svn_txn_utils import get_txn_id_from_store
from vcsserver.remote.git_remote import Repository
celery_app = Celery('__vcsserver__')
log = logging.getLogger(__name__)
class HooksCeleryClient:
TASK_TIMEOUT = 60 # time in seconds
custom_opts = {}
def __init__(self, broker_url, result_backend, **kwargs):
_celery_opts = {
'broker_url': broker_url,
'result_backend': result_backend,
'broker_connection_retry_on_startup': True,
'task_serializer': 'json',
'accept_content': ['json', 'msgpack'],
'result_serializer': 'json',
'result_accept_content': ['json', 'msgpack']
}
if custom_opts := kwargs.pop('_celery_opts', {}):
_celery_opts.update(custom_opts)
self.custom_opts = custom_opts
celery_app.config_from_object(_celery_opts)
self.celery_app = celery_app
def __call__(self, method, extras):
# NOTE: exception handling for those tasks executed is in
# @adopt_for_celery decorator
# also see: _maybe_handle_exception which is handling exceptions
inquired_task = self.celery_app.signature(f'rhodecode.lib.celerylib.tasks.{method}')
task_meta = inquired_task.apply_async(args=(extras,))
result = task_meta.get(timeout=self.TASK_TIMEOUT)
return result
def __repr__(self):
return f'HooksCeleryClient(opts={self.custom_opts})'
class HooksShadowRepoClient:
def __call__(self, hook_name, extras):
return {'output': '', 'status': 0}
class RemoteMessageWriter:
"""Writer base class."""
def write(self, message):
raise NotImplementedError()
class HgMessageWriter(RemoteMessageWriter):
"""Writer that knows how to send messages to mercurial clients."""
def __init__(self, ui):
self.ui = ui
def write(self, message: str):
args = (message.encode('utf-8'),)
self.ui._writemsg(self.ui._fmsgerr, type=b'status', *args)
class GitMessageWriter(RemoteMessageWriter):
"""Writer that knows how to send messages to git clients."""
def __init__(self, stdout=None):
self.stdout = stdout or sys.stdout
def write(self, message: str):
self.stdout.write(message + "\n" if message else "")
class SvnMessageWriter(RemoteMessageWriter):
"""Writer that knows how to send messages to svn clients."""
def __init__(self, stderr=None):
# SVN needs data sent to stderr for back-to-client messaging
self.stderr = stderr or sys.stderr
def write(self, message):
self.stderr.write(message)
def _maybe_handle_exception(writer, result):
"""
adopt_for_celery defines the exception/exception_traceback
This result is a direct output from a celery task
"""
exception_class = result.get('exception')
exception_traceback = result.get('exception_traceback')
match exception_class:
# NOTE: the underlying exceptions are setting _vcs_kind special marker
# which is later handled by `handle_vcs_exception` and translated into a special HTTP exception
# propagated later to the client
case 'HTTPLockedRepo':
raise exceptions.LockedRepoException()(*result['exception_args'])
case 'ClientNotSupported':
raise exceptions.ClientNotSupportedException()(*result['exception_args'])
case 'HTTPBranchProtected':
raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
case 'RepositoryError':
raise exceptions.VcsException()(*result['exception_args'])
case _:
if exception_class:
# level here should be info/debug so the remote client wouldn't see this as a stderr message
log.info('ERROR: Handling hook-call exception. Got traceback from remote call:%s', exception_traceback)
raise Exception(
f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
)
def _get_hooks_client(extras):
is_shadow_repo = extras.get('is_shadow_repo')
hooks_protocol = extras.get('hooks_protocol')
if hooks_protocol == 'celery':
try:
celery_config = extras['hooks_config']
broker_url = celery_config['broker_url']
result_backend = celery_config['result_backend']
except Exception:
log.exception("Failed to get celery task queue and backend")
raise
return HooksCeleryClient(broker_url, result_backend, _celery_opts=celery_config)
elif is_shadow_repo:
return HooksShadowRepoClient()
else:
raise Exception("Hooks client not found!")
def _call_hook(hook_name, extras, writer):
hooks_client = _get_hooks_client(extras)
log.debug('Hooks, using client:%s', hooks_client)
result = hooks_client(hook_name, extras)
log.debug('Hooks got result: %s', result)
_maybe_handle_exception(writer, result)
writer.write(result['output'])
return result['status']
def _extras_from_ui(ui):
hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
if not hook_data:
# maybe it's inside environ ?
env_hook_data = os.environ.get('RC_SCM_DATA')
if env_hook_data:
hook_data = env_hook_data
extras = {}
if hook_data:
extras = json.loads(hook_data)
return extras
def _rev_range_hash(repo, node, check_heads=False):
from vcsserver.hgcompat import get_ctx
commits = []
revs = []
start = get_ctx(repo, node).rev()
end = len(repo)
for rev in range(start, end):
revs.append(rev)
ctx = get_ctx(repo, rev)
commit_id = ascii_str(mercurial.node.hex(ctx.node()))
branch = safe_str(ctx.branch())
commits.append((commit_id, branch))
parent_heads = []
if check_heads:
parent_heads = _check_heads(repo, start, end, revs)
return commits, parent_heads
def _check_heads(repo, start, end, commits):
from vcsserver.hgcompat import get_ctx
changelog = repo.changelog
parents = set()
for new_rev in commits:
for p in changelog.parentrevs(new_rev):
if p == mercurial.node.nullrev:
continue
if p < start:
parents.add(p)
for p in parents:
branch = get_ctx(repo, p).branch()
# The heads descending from that parent, on the same branch
parent_heads = {p}
reachable = {p}
for x in range(p + 1, end):
if get_ctx(repo, x).branch() != branch:
continue
for pp in changelog.parentrevs(x):
if pp in reachable:
reachable.add(x)
parent_heads.discard(pp)
parent_heads.add(x)
# More than one head? Suggest merging
if len(parent_heads) > 1:
return list(parent_heads)
return []
def _get_git_env():
env = {}
for k, v in os.environ.items():
if k.startswith('GIT'):
env[k] = v
# serialized version
return [(k, v) for k, v in env.items()]
def _get_hg_env(old_rev, new_rev, txnid, repo_path):
env = {}
for k, v in os.environ.items():
if k.startswith('HG'):
env[k] = v
env['HG_NODE'] = old_rev
env['HG_NODE_LAST'] = new_rev
env['HG_TXNID'] = txnid
env['HG_PENDING'] = repo_path
return [(k, v) for k, v in env.items()]
def _get_ini_settings(ini_file):
from vcsserver.http_main import sanitize_settings_and_apply_defaults
from vcsserver.lib.config_utils import get_app_config_lightweight, configure_and_store_settings
global_config = {'__file__': ini_file}
ini_settings = get_app_config_lightweight(ini_file)
sanitize_settings_and_apply_defaults(global_config, ini_settings)
configure_and_store_settings(global_config, ini_settings)
return ini_settings
def _fix_hooks_executables(ini_path=''):
"""
This is a trick to set proper settings.EXECUTABLE paths for certain execution patterns
especially for subversion where hooks strip entire env, and calling just 'svn' command will most likely fail
because svn is not on PATH
"""
# set defaults, in case we can't read from ini_file
core_binary_dir = settings.BINARY_DIR or '/usr/local/bin/rhodecode_bin/vcs_bin'
if ini_path:
ini_settings = _get_ini_settings(ini_path)
core_binary_dir = ini_settings['core.binary_dir']
settings.BINARY_DIR = core_binary_dir
def repo_size(ui, repo, **kwargs):
extras = _extras_from_ui(ui)
return _call_hook('repo_size', extras, HgMessageWriter(ui))
def pre_pull(ui, repo, **kwargs):
extras = _extras_from_ui(ui)
return _call_hook('pre_pull', extras, HgMessageWriter(ui))
def pre_pull_ssh(ui, repo, **kwargs):
extras = _extras_from_ui(ui)
if extras and extras.get('SSH'):
return pre_pull(ui, repo, **kwargs)
return 0
def post_pull(ui, repo, **kwargs):
extras = _extras_from_ui(ui)
return _call_hook('post_pull', extras, HgMessageWriter(ui))
def post_pull_ssh(ui, repo, **kwargs):
extras = _extras_from_ui(ui)
if extras and extras.get('SSH'):
return post_pull(ui, repo, **kwargs)
return 0
def pre_push(ui, repo, node=None, **kwargs):
"""
Mercurial pre_push hook
"""
extras = _extras_from_ui(ui)
detect_force_push = extras.get('detect_force_push')
rev_data = []
hook_type: str = safe_str(kwargs.get('hooktype'))
if node and hook_type == 'pretxnchangegroup':
branches = collections.defaultdict(list)
commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
for commit_id, branch in commits:
branches[branch].append(commit_id)
for branch, commits in branches.items():
old_rev = ascii_str(kwargs.get('node_last')) or commits[0]
rev_data.append({
'total_commits': len(commits),
'old_rev': old_rev,
'new_rev': commits[-1],
'ref': '',
'type': 'branch',
'name': branch,
})
for push_ref in rev_data:
push_ref['multiple_heads'] = _heads
repo_path = os.path.join(
extras.get('repo_store', ''), extras.get('repository', ''))
push_ref['hg_env'] = _get_hg_env(
old_rev=push_ref['old_rev'],
new_rev=push_ref['new_rev'], txnid=ascii_str(kwargs.get('txnid')),
repo_path=repo_path)
extras['hook_type'] = hook_type or 'pre_push'
extras['commit_ids'] = rev_data
return _call_hook('pre_push', extras, HgMessageWriter(ui))
def pre_push_ssh(ui, repo, node=None, **kwargs):
extras = _extras_from_ui(ui)
if extras.get('SSH'):
return pre_push(ui, repo, node, **kwargs)
return 0
def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
"""
Mercurial pre_push hook for SSH
"""
extras = _extras_from_ui(ui)
if extras.get('SSH'):
permission = extras['SSH_PERMISSIONS']
if 'repository.write' == permission or 'repository.admin' == permission:
return 0
# non-zero ret code
return 1
return 0
def post_push(ui, repo, node, **kwargs):
"""
Mercurial post_push hook
"""
extras = _extras_from_ui(ui)
commit_ids = []
branches = []
bookmarks = []
tags = []
hook_type: str = safe_str(kwargs.get('hooktype'))
commits, _heads = _rev_range_hash(repo, node)
for commit_id, branch in commits:
commit_ids.append(commit_id)
if branch not in branches:
branches.append(branch)
if hasattr(ui, '_rc_pushkey_bookmarks'):
bookmarks = ui._rc_pushkey_bookmarks
extras['hook_type'] = hook_type or 'post_push'
extras['commit_ids'] = commit_ids
extras['new_refs'] = {
'branches': branches,
'bookmarks': bookmarks,
'tags': tags
}
return _call_hook('post_push', extras, HgMessageWriter(ui))
def post_push_ssh(ui, repo, node, **kwargs):
"""
Mercurial post_push hook for SSH
"""
if _extras_from_ui(ui).get('SSH'):
return post_push(ui, repo, node, **kwargs)
return 0
def key_push(ui, repo, **kwargs):
from vcsserver.hgcompat import get_ctx
if kwargs['new'] != b'0' and kwargs['namespace'] == b'bookmarks':
# store new bookmarks in our UI object propagated later to post_push
ui._rc_pushkey_bookmarks = get_ctx(repo, kwargs['key']).bookmarks()
return
# backward compat
log_pull_action = post_pull
# backward compat
log_push_action = post_push
def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
"""
Old hook name: keep here for backward compatibility.
This is only required when the installed git hooks are not upgraded.
"""
pass
def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
"""
Old hook name: keep here for backward compatibility.
This is only required when the installed git hooks are not upgraded.
"""
pass
@dataclasses.dataclass
class HookResponse:
status: int
output: str
def git_pre_pull(extras) -> HookResponse:
"""
Pre pull hook.
:param extras: dictionary containing the keys defined in simplevcs
:type extras: dict
:return: status code of the hook. 0 for success.
:rtype: int
"""
if 'pull' not in extras['hooks']:
return HookResponse(0, '')
stdout = io.StringIO()
try:
status_code = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
except Exception as error:
log.exception('Failed to call pre_pull hook')
status_code = 128
stdout.write(f'ERROR: {error}\n')
return HookResponse(status_code, stdout.getvalue())
def git_post_pull(extras) -> HookResponse:
"""
Post pull hook.
:param extras: dictionary containing the keys defined in simplevcs
:type extras: dict
:return: status code of the hook. 0 for success.
:rtype: int
"""
if 'pull' not in extras['hooks']:
return HookResponse(0, '')
stdout = io.StringIO()
try:
status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
except Exception as error:
status = 128
stdout.write(f'ERROR: {error}\n')
return HookResponse(status, stdout.getvalue())
def _parse_git_ref_lines(revision_lines):
rev_data = []
for revision_line in revision_lines or []:
old_rev, new_rev, ref = revision_line.strip().split(' ')
ref_data = ref.split('/', 2)
if ref_data[1] in ('tags', 'heads'):
rev_data.append({
# NOTE(marcink):
# we're unable to tell total_commits for git at this point
# but we set the variable for consistency with GIT
'total_commits': -1,
'old_rev': old_rev,
'new_rev': new_rev,
'ref': ref,
'type': ref_data[1],
'name': ref_data[2],
})
return rev_data
def git_pre_receive(unused_repo_path, revision_lines, env) -> int:
"""
Pre push hook.
:return: status code of the hook. 0 for success.
"""
extras = json.loads(env['RC_SCM_DATA'])
rev_data = _parse_git_ref_lines(revision_lines)
if 'push' not in extras['hooks']:
return 0
_fix_hooks_executables(env.get('RC_INI_FILE'))
empty_commit_id = '0' * 40
detect_force_push = extras.get('detect_force_push')
for push_ref in rev_data:
# store our git-env which holds the temp store
push_ref['git_env'] = _get_git_env()
push_ref['pruned_sha'] = ''
if not detect_force_push:
# don't check for forced-push when we don't need to
continue
type_ = push_ref['type']
new_branch = push_ref['old_rev'] == empty_commit_id
delete_branch = push_ref['new_rev'] == empty_commit_id
if type_ == 'heads' and not (new_branch or delete_branch):
old_rev = push_ref['old_rev']
new_rev = push_ref['new_rev']
cmd = [settings.GIT_EXECUTABLE(), 'rev-list', old_rev, f'^{new_rev}']
stdout, stderr = subprocessio.run_command(
cmd, env=os.environ.copy())
# means we're having some non-reachable objects, this forced push was used
if stdout:
push_ref['pruned_sha'] = stdout.splitlines()
extras['hook_type'] = 'pre_receive'
extras['commit_ids'] = rev_data
stdout = sys.stdout
status_code = _call_hook('pre_push', extras, GitMessageWriter(stdout))
return status_code
def git_post_receive(unused_repo_path, revision_lines, env) -> int:
"""
Post push hook.
:return: status code of the hook. 0 for success.
"""
extras = json.loads(env['RC_SCM_DATA'])
if 'push' not in extras['hooks']:
return 0
_fix_hooks_executables(env.get('RC_INI_FILE'))
rev_data = _parse_git_ref_lines(revision_lines)
git_revs = []
# N.B.(skreft): it is ok to just call git, as git before calling a
# subcommand sets the PATH environment variable so that it point to the
# correct version of the git executable.
empty_commit_id = '0' * 40
branches = []
tags = []
for push_ref in rev_data:
type_ = push_ref['type']
if type_ == 'heads':
# starting new branch case
if push_ref['old_rev'] == empty_commit_id:
push_ref_name = push_ref['name']
if push_ref_name not in branches:
branches.append(push_ref_name)
need_head_set = ''
with Repository(os.getcwd()) as repo:
try:
repo.head
except pygit2.GitError:
need_head_set = f'refs/heads/{push_ref_name}'
if need_head_set:
repo.set_head(need_head_set)
print(f"Setting default branch to {push_ref_name}")
cmd = [settings.GIT_EXECUTABLE(), 'for-each-ref', '--format=%(refname)', 'refs/heads/*']
stdout, stderr = subprocessio.run_command(
cmd, env=os.environ.copy())
heads = safe_str(stdout)
heads = heads.replace(push_ref['ref'], '')
heads = ' '.join(head for head
in heads.splitlines() if head) or '.'
cmd = [settings.GIT_EXECUTABLE(), 'log', '--reverse',
'--pretty=format:%H', '--', push_ref['new_rev'],
'--not', heads]
stdout, stderr = subprocessio.run_command(
cmd, env=os.environ.copy())
git_revs.extend(list(map(ascii_str, stdout.splitlines())))
# delete branch case
elif push_ref['new_rev'] == empty_commit_id:
git_revs.append(f'delete_branch=>{push_ref["name"]}')
else:
if push_ref['name'] not in branches:
branches.append(push_ref['name'])
cmd = [settings.GIT_EXECUTABLE(), 'log',
f'{push_ref["old_rev"]}..{push_ref["new_rev"]}',
'--reverse', '--pretty=format:%H']
stdout, stderr = subprocessio.run_command(
cmd, env=os.environ.copy())
# we get bytes from stdout, we need str to be consistent
log_revs = list(map(ascii_str, stdout.splitlines()))
git_revs.extend(log_revs)
# Pure pygit2 impl. but still 2-3x slower :/
# results = []
#
# with Repository(os.getcwd()) as repo:
# repo_new_rev = repo[push_ref['new_rev']]
# repo_old_rev = repo[push_ref['old_rev']]
# walker = repo.walk(repo_new_rev.id, pygit2.GIT_SORT_TOPOLOGICAL)
#
# for commit in walker:
# if commit.id == repo_old_rev.id:
# break
# results.append(commit.id.hex)
# # reverse the order, can't use GIT_SORT_REVERSE
# log_revs = results[::-1]
elif type_ == 'tags':
if push_ref['name'] not in tags:
tags.append(push_ref['name'])
git_revs.append(f'tag=>{push_ref["name"]}')
extras['hook_type'] = 'post_receive'
extras['commit_ids'] = git_revs
extras['new_refs'] = {
'branches': branches,
'bookmarks': [],
'tags': tags,
}
stdout = sys.stdout
if 'repo_size' in extras['hooks']:
try:
_call_hook('repo_size', extras, GitMessageWriter(stdout))
except Exception:
pass
status_code = _call_hook('post_push', extras, GitMessageWriter(stdout))
return status_code
def get_extras_from_txn_id(repo_path, txn_id):
extras = get_txn_id_from_store(repo_path, txn_id)
return extras
def svn_pre_commit(repo_path, commit_data, env):
path, txn_id = commit_data
branches = []
tags = []
if env.get('RC_SCM_DATA'):
extras = json.loads(env['RC_SCM_DATA'])
else:
ini_path = env.get('RC_INI_FILE')
if ini_path:
_get_ini_settings(ini_path)
# fallback method to read from TXN-ID stored data
extras = get_extras_from_txn_id(path, txn_id)
if not extras:
raise ValueError('SVN-PRE-COMMIT: Failed to extract context data in called extras for hook execution')
if extras.get('rc_internal_commit'):
# special marker for internal commit, we don't call hooks client
return 0
extras['hook_type'] = 'pre_commit'
extras['commit_ids'] = [txn_id]
extras['txn_id'] = txn_id
extras['new_refs'] = {
'total_commits': 1,
'branches': branches,
'bookmarks': [],
'tags': tags,
}
return _call_hook('pre_push', extras, SvnMessageWriter())
def svn_post_commit(repo_path, commit_data, env):
"""
commit_data is path, rev, txn_id
"""
if len(commit_data) == 3:
path, commit_id, txn_id = commit_data
elif len(commit_data) == 2:
log.error('Failed to extract txn_id from commit_data using legacy method. '
'Some functionality might be limited')
path, commit_id = commit_data
txn_id = None
else:
return 0
branches = []
tags = []
if env.get('RC_SCM_DATA'):
extras = json.loads(env['RC_SCM_DATA'])
else:
ini_path = env.get('RC_INI_FILE')
if ini_path:
_get_ini_settings(ini_path)
# fallback method to read from TXN-ID stored data
extras = get_extras_from_txn_id(path, txn_id)
if not extras and txn_id:
raise ValueError('SVN-POST-COMMIT: Failed to extract context data in called extras for hook execution')
if extras.get('rc_internal_commit'):
# special marker for internal commit, we don't call hooks client
return 0
extras['hook_type'] = 'post_commit'
extras['commit_ids'] = [commit_id]
extras['txn_id'] = txn_id
extras['new_refs'] = {
'branches': branches,
'bookmarks': [],
'tags': tags,
'total_commits': 1,
}
if 'repo_size' in extras['hooks']:
try:
_call_hook('repo_size', extras, SvnMessageWriter())
except Exception:
pass
return _call_hook('post_push', extras, SvnMessageWriter())