webhook.py
175 lines
| 7.6 KiB
| text/x-python
|
PythonLexer
r5123 | # Copyright (C) 2012-2023 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import logging | ||||
from rhodecode import events | ||||
from rhodecode.integrations.types.base import CommitParsingDataHandler, UrlTmpl | ||||
log = logging.getLogger(__name__) | ||||
class WebhookDataHandler(CommitParsingDataHandler): | ||||
name = 'webhook' | ||||
def __init__(self, template_url, headers): | ||||
self.template_url = template_url | ||||
self.headers = headers | ||||
def get_base_parsed_template(self, data): | ||||
""" | ||||
initially parses the passed in template with some common variables | ||||
available on ALL calls | ||||
""" | ||||
# note: make sure to update the `WEBHOOK_URL_VARS` if this changes | ||||
common_vars = { | ||||
'repo_name': data['repo']['repo_name'], | ||||
'repo_type': data['repo']['repo_type'], | ||||
'repo_id': data['repo']['repo_id'], | ||||
'repo_url': data['repo']['url'], | ||||
'username': data['actor']['username'], | ||||
'user_id': data['actor']['user_id'], | ||||
'event_name': data['name'] | ||||
} | ||||
extra_vars = {} | ||||
for extra_key, extra_val in data['repo']['extra_fields'].items(): | ||||
extra_vars[f'extra__{extra_key}'] = extra_val | ||||
common_vars.update(extra_vars) | ||||
template_url = self.template_url.replace('${extra:', '${extra__') | ||||
for k, v in common_vars.items(): | ||||
template_url = UrlTmpl(template_url).safe_substitute(**{k: v}) | ||||
return template_url | ||||
def repo_push_event_handler(self, event, data): | ||||
url = self.get_base_parsed_template(data) | ||||
url_calls = [] | ||||
branches_commits = self.aggregate_branch_data( | ||||
data['push']['branches'], data['push']['commits']) | ||||
if '${branch}' in url or '${branch_head}' in url or '${commit_id}' in url: | ||||
# call it multiple times, for each branch if used in variables | ||||
for branch, commit_ids in branches_commits.items(): | ||||
branch_url = UrlTmpl(url).safe_substitute(branch=branch) | ||||
if '${branch_head}' in branch_url: | ||||
# last commit in the aggregate is the head of the branch | ||||
branch_head = commit_ids['branch_head'] | ||||
branch_url = UrlTmpl(branch_url).safe_substitute(branch_head=branch_head) | ||||
# call further down for each commit if used | ||||
if '${commit_id}' in branch_url: | ||||
for commit_data in commit_ids['commits']: | ||||
commit_id = commit_data['raw_id'] | ||||
commit_url = UrlTmpl(branch_url).safe_substitute(commit_id=commit_id) | ||||
# register per-commit call | ||||
log.debug( | ||||
'register %s call(%s) to url %s', | ||||
self.name, event, commit_url) | ||||
url_calls.append( | ||||
(commit_url, self.headers, data)) | ||||
else: | ||||
# register per-branch call | ||||
log.debug('register %s call(%s) to url %s', | ||||
self.name, event, branch_url) | ||||
url_calls.append((branch_url, self.headers, data)) | ||||
else: | ||||
log.debug('register %s call(%s) to url %s', self.name, event, url) | ||||
url_calls.append((url, self.headers, data)) | ||||
return url_calls | ||||
def repo_commit_comment_handler(self, event, data): | ||||
url = self.get_base_parsed_template(data) | ||||
log.debug('register %s call(%s) to url %s', self.name, event, url) | ||||
comment_vars = [ | ||||
('commit_comment_id', data['comment']['comment_id']), | ||||
('commit_comment_text', data['comment']['comment_text']), | ||||
('commit_comment_type', data['comment']['comment_type']), | ||||
('commit_comment_f_path', data['comment']['comment_f_path']), | ||||
('commit_comment_line_no', data['comment']['comment_line_no']), | ||||
('commit_comment_commit_id', data['commit']['commit_id']), | ||||
('commit_comment_commit_branch', data['commit']['commit_branch']), | ||||
('commit_comment_commit_message', data['commit']['commit_message']), | ||||
] | ||||
for k, v in comment_vars: | ||||
url = UrlTmpl(url).safe_substitute(**{k: v}) | ||||
return [(url, self.headers, data)] | ||||
def repo_commit_comment_edit_handler(self, event, data): | ||||
url = self.get_base_parsed_template(data) | ||||
log.debug('register %s call(%s) to url %s', self.name, event, url) | ||||
comment_vars = [ | ||||
('commit_comment_id', data['comment']['comment_id']), | ||||
('commit_comment_text', data['comment']['comment_text']), | ||||
('commit_comment_type', data['comment']['comment_type']), | ||||
('commit_comment_f_path', data['comment']['comment_f_path']), | ||||
('commit_comment_line_no', data['comment']['comment_line_no']), | ||||
('commit_comment_commit_id', data['commit']['commit_id']), | ||||
('commit_comment_commit_branch', data['commit']['commit_branch']), | ||||
('commit_comment_commit_message', data['commit']['commit_message']), | ||||
] | ||||
for k, v in comment_vars: | ||||
url = UrlTmpl(url).safe_substitute(**{k: v}) | ||||
return [(url, self.headers, data)] | ||||
def repo_create_event_handler(self, event, data): | ||||
url = self.get_base_parsed_template(data) | ||||
log.debug('register %s call(%s) to url %s', self.name, event, url) | ||||
return [(url, self.headers, data)] | ||||
def pull_request_event_handler(self, event, data): | ||||
url = self.get_base_parsed_template(data) | ||||
log.debug('register %s call(%s) to url %s', self.name, event, url) | ||||
pr_vars = [ | ||||
('pull_request_id', data['pullrequest']['pull_request_id']), | ||||
('pull_request_title', data['pullrequest']['title']), | ||||
('pull_request_url', data['pullrequest']['url']), | ||||
('pull_request_shadow_url', data['pullrequest']['shadow_url']), | ||||
('pull_request_commits_uid', data['pullrequest']['commits_uid']), | ||||
] | ||||
for k, v in pr_vars: | ||||
url = UrlTmpl(url).safe_substitute(**{k: v}) | ||||
return [(url, self.headers, data)] | ||||
def __call__(self, event, data): | ||||
if isinstance(event, events.RepoPushEvent): | ||||
return self.repo_push_event_handler(event, data) | ||||
elif isinstance(event, events.RepoCreateEvent): | ||||
return self.repo_create_event_handler(event, data) | ||||
elif isinstance(event, events.RepoCommitCommentEvent): | ||||
return self.repo_commit_comment_handler(event, data) | ||||
elif isinstance(event, events.RepoCommitCommentEditEvent): | ||||
return self.repo_commit_comment_edit_handler(event, data) | ||||
elif isinstance(event, events.PullRequestEvent): | ||||
return self.pull_request_event_handler(event, data) | ||||
else: | ||||
raise ValueError( | ||||
f'event type `{event.__class__}` has no handler defined') | ||||