# Copyright (C) 2012-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import deform.widget
import logging
import colander
import rhodecode
from rhodecode import events
from rhodecode.lib.colander_utils import strip_whitespace
from rhodecode.model.validation_schema.widgets import CheckboxChoiceWidgetDesc
from rhodecode.translation import _
from rhodecode.integrations.types.base import (
IntegrationTypeBase, get_auth, get_web_token, get_url_vars,
WebhookDataHandler, WEBHOOK_URL_VARS, requests_retry_call)
from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
from rhodecode.model.validation_schema import widgets
log = logging.getLogger(__name__)
# updating this required to update the `common_vars` passed in url calling func
URL_VARS = get_url_vars(WEBHOOK_URL_VARS)
class WebhookSettingsSchema(colander.Schema):
url = colander.SchemaNode(
colander.String(),
title=_('Webhook URL'),
description=
_('URL to which Webhook should submit data. If used some of the '
'variables would trigger multiple calls, like ${branch} or '
'${commit_id}. Webhook will be called as many times as unique '
'objects in data in such cases.'),
missing=colander.required,
required=True,
preparer=strip_whitespace,
validator=colander.url,
widget=widgets.CodeMirrorWidget(
help_block_collapsable_name='Show url variables',
help_block_collapsable=(
'E.g http://my-serv.com/trigger_job/${{event_name}}'
'?PR_ID=${{pull_request_id}}'
'\nFull list of vars:\n{}'.format(URL_VARS)),
codemirror_mode='text',
codemirror_options='{"lineNumbers": false, "lineWrapping": true}'),
)
secret_token = colander.SchemaNode(
colander.String(),
title=_('Secret Token'),
description=_('Optional string used to validate received payloads. '
'It will be sent together with event data in JSON'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. secret_token'
),
)
username = colander.SchemaNode(
colander.String(),
title=_('Username'),
description=_('Optional username to authenticate the call.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. admin'
),
)
password = colander.SchemaNode(
colander.String(),
title=_('Password'),
description=_('Optional password to authenticate the call.'),
default='',
missing='',
widget=deform.widget.PasswordWidget(
placeholder='e.g. secret.',
redisplay=True,
),
)
custom_header_key = colander.SchemaNode(
colander.String(),
title=_('Custom Header Key'),
description=_('Custom Header name to be set when calling endpoint.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g: Authorization'
),
)
custom_header_val = colander.SchemaNode(
colander.String(),
title=_('Custom Header Value'),
description=_('Custom Header value to be set when calling endpoint.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. Basic XxXxXx'
),
)
method_type = colander.SchemaNode(
colander.String(),
title=_('Call Method'),
description=_('Select a HTTP method to use when calling the Webhook.'),
default='post',
missing='',
widget=deform.widget.RadioChoiceWidget(
values=[('get', 'GET'), ('post', 'POST'), ('put', 'PUT')],
inline=True
),
)
class WebhookIntegrationType(IntegrationTypeBase):
key = 'webhook'
display_name = _('Webhook')
description = _('send JSON data to a url endpoint')
@classmethod
def icon(cls):
return ''''''
valid_events = [
events.PullRequestCloseEvent,
events.PullRequestMergeEvent,
events.PullRequestUpdateEvent,
events.PullRequestCommentEvent,
events.PullRequestCommentEditEvent,
events.PullRequestReviewEvent,
events.PullRequestCreateEvent,
events.RepoPushEvent,
events.RepoCreateEvent,
events.RepoCommitCommentEvent,
events.RepoCommitCommentEditEvent,
]
def settings_schema(self):
schema = WebhookSettingsSchema()
schema.add(colander.SchemaNode(
colander.Set(),
widget=CheckboxChoiceWidgetDesc(
values=sorted(
[(e.name, e.display_name, e.description) for e in self.valid_events]
),
),
description="List of events activated for this integration",
name='events'
))
return schema
def send_event(self, event):
log.debug('handling event %s with integration %s', event.name, self)
if event.__class__ not in self.valid_events:
log.debug('event %r not present in valid event list (%s)', event, self.valid_events)
return
if not self.event_enabled(event):
return
data = event.as_dict()
template_url = self.settings['url']
headers = {}
head_key = self.settings.get('custom_header_key')
head_val = self.settings.get('custom_header_val')
if head_key and head_val:
headers = {head_key: head_val}
handler = WebhookDataHandler(template_url, headers)
url_calls = handler(event, data)
log.debug('Webhook: calling following urls: %s', [x[0] for x in url_calls])
run_task(post_to_webhook, url_calls, self.settings)
@async_task(ignore_result=True, base=RequestContextTask)
def post_to_webhook(url_calls, settings):
"""
Example data::
{'actor': {'user_id': 2, 'username': u'admin'},
'actor_ip': u'192.168.157.1',
'name': 'repo-push',
'push': {'branches': [{'name': u'default',
'url': 'http://rc.local:8080/hg-repo/changelog?branch=default'}],
'commits': [{'author': u'Marcin Kuzminski ',
'branch': u'default',
'date': datetime.datetime(2017, 11, 30, 12, 59, 48),
'issues': [],
'mentions': [],
'message': u'commit Thu 30 Nov 2017 13:59:48 CET',
'message_html': u'commit Thu 30 Nov 2017 13:59:48 CET',
'message_html_title': u'commit Thu 30 Nov 2017 13:59:48 CET',
'parents': [{'raw_id': '431b772a5353dad9974b810dd3707d79e3a7f6e0'}],
'permalink_url': u'http://rc.local:8080/_7/changeset/a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf',
'raw_id': 'a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf',
'refs': {'bookmarks': [],
'branches': [u'default'],
'tags': [u'tip']},
'reviewers': [],
'revision': 9L,
'short_id': 'a815cc738b96',
'url': u'http://rc.local:8080/hg-repo/changeset/a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf'}],
'issues': {}},
'repo': {'extra_fields': '',
'permalink_url': u'http://rc.local:8080/_7',
'repo_id': 7,
'repo_name': u'hg-repo',
'repo_type': u'hg',
'url': u'http://rc.local:8080/hg-repo'},
'server_url': u'http://rc.local:8080',
'utc_timestamp': datetime.datetime(2017, 11, 30, 13, 0, 1, 569276)
}
"""
call_headers = {
'User-Agent': 'RhodeCode-webhook-caller/{}'.format(rhodecode.__version__)
} # updated below with custom ones, allows override
auth = get_auth(settings)
token = get_web_token(settings)
for url, headers, data in url_calls:
req_session = requests_retry_call()
method = settings.get('method_type') or 'post'
call_method = getattr(req_session, method)
headers = headers or {}
call_headers.update(headers)
log.debug('calling Webhook with method: %s, and auth:%s', call_method, auth)
if settings.get('log_data'):
log.debug('calling webhook with data: %s', data)
resp = call_method(url, json={
'token': token,
'event': data
}, headers=call_headers, auth=auth, timeout=60)
log.debug('Got Webhook response: %s', resp)
try:
resp.raise_for_status() # raise exception on a failed request
except Exception:
log.error(resp.text)
raise