# -*- coding: utf-8 -*-
# Copyright (C) 2012-2018 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
from __future__ import unicode_literals
import string
import collections
import deform
import deform.widget
import logging
import requests
import requests.adapters
import colander
from requests.packages.urllib3.util.retry import Retry
import rhodecode
from rhodecode import events
from rhodecode.translation import _
from rhodecode.integrations.types.base import (
IntegrationTypeBase, get_auth, get_url_vars, WEBHOOK_URL_VARS)
from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
from rhodecode.model.validation_schema import widgets
log = logging.getLogger(__name__)
# updating this required to update the `common_vars` passed in url calling func
URL_VARS = get_url_vars(WEBHOOK_URL_VARS)
class WebhookHandler(object):
def __init__(self, template_url, secret_token, headers):
self.template_url = template_url
self.secret_token = secret_token
self.headers = headers
def get_base_parsed_template(self, data):
"""
initially parses the passed in template with some common variables
available on ALL calls
"""
# note: make sure to update the `WEBHOOK_URL_VARS` if this changes
common_vars = {
'repo_name': data['repo']['repo_name'],
'repo_type': data['repo']['repo_type'],
'repo_id': data['repo']['repo_id'],
'repo_url': data['repo']['url'],
'username': data['actor']['username'],
'user_id': data['actor']['user_id'],
'event_name': data['name']
}
extra_vars = {}
for extra_key, extra_val in data['repo']['extra_fields'].items():
extra_vars['extra__{}'.format(extra_key)] = extra_val
common_vars.update(extra_vars)
template_url = self.template_url.replace('${extra:', '${extra__')
return string.Template(template_url).safe_substitute(**common_vars)
def repo_push_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
url_cals = []
branch_data = collections.OrderedDict()
for obj in data['push']['branches']:
branch_data[obj['name']] = obj
branches_commits = collections.OrderedDict()
for commit in data['push']['commits']:
if commit.get('git_ref_change'):
# special case for GIT that allows creating tags,
# deleting branches without associated commit
continue
if commit['branch'] not in branches_commits:
branch_commits = {'branch': branch_data[commit['branch']],
'commits': []}
branches_commits[commit['branch']] = branch_commits
branch_commits = branches_commits[commit['branch']]
branch_commits['commits'].append(commit)
if '${branch}' in url:
# call it multiple times, for each branch if used in variables
for branch, commit_ids in branches_commits.items():
branch_url = string.Template(url).safe_substitute(branch=branch)
# call further down for each commit if used
if '${commit_id}' in branch_url:
for commit_data in commit_ids['commits']:
commit_id = commit_data['raw_id']
commit_url = string.Template(branch_url).safe_substitute(
commit_id=commit_id)
# register per-commit call
log.debug(
'register webhook call(%s) to url %s', event, commit_url)
url_cals.append((commit_url, self.secret_token, self.headers, data))
else:
# register per-branch call
log.debug(
'register webhook call(%s) to url %s', event, branch_url)
url_cals.append((branch_url, self.secret_token, self.headers, data))
else:
log.debug(
'register webhook call(%s) to url %s', event, url)
url_cals.append((url, self.secret_token, self.headers, data))
return url_cals
def repo_create_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug(
'register webhook call(%s) to url %s', event, url)
return [(url, self.secret_token, self.headers, data)]
def pull_request_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug(
'register webhook call(%s) to url %s', event, url)
url = string.Template(url).safe_substitute(
pull_request_id=data['pullrequest']['pull_request_id'],
pull_request_url=data['pullrequest']['url'],
pull_request_shadow_url=data['pullrequest']['shadow_url'],)
return [(url, self.secret_token, self.headers, data)]
def __call__(self, event, data):
if isinstance(event, events.RepoPushEvent):
return self.repo_push_event_handler(event, data)
elif isinstance(event, events.RepoCreateEvent):
return self.repo_create_event_handler(event, data)
elif isinstance(event, events.PullRequestEvent):
return self.pull_request_event_handler(event, data)
else:
raise ValueError('event type not supported: %s' % events)
class WebhookSettingsSchema(colander.Schema):
url = colander.SchemaNode(
colander.String(),
title=_('Webhook URL'),
description=
_('URL to which Webhook should submit data. If used some of the '
'variables would trigger multiple calls, like ${branch} or '
'${commit_id}. Webhook will be called as many times as unique '
'objects in data in such cases.'),
missing=colander.required,
required=True,
validator=colander.url,
widget=widgets.CodeMirrorWidget(
help_block_collapsable_name='Show url variables',
help_block_collapsable=(
'E.g http://my-serv/trigger_job/${{event_name}}'
'?PR_ID=${{pull_request_id}}'
'\nFull list of vars:\n{}'.format(URL_VARS)),
codemirror_mode='text',
codemirror_options='{"lineNumbers": false, "lineWrapping": true}'),
)
secret_token = colander.SchemaNode(
colander.String(),
title=_('Secret Token'),
description=_('Optional string used to validate received payloads. '
'It will be sent together with event data in JSON'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. secret_token'
),
)
username = colander.SchemaNode(
colander.String(),
title=_('Username'),
description=_('Optional username to authenticate the call.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. admin'
),
)
password = colander.SchemaNode(
colander.String(),
title=_('Password'),
description=_('Optional password to authenticate the call.'),
default='',
missing='',
widget=deform.widget.PasswordWidget(
placeholder='e.g. secret.',
redisplay=True,
),
)
custom_header_key = colander.SchemaNode(
colander.String(),
title=_('Custom Header Key'),
description=_('Custom Header name to be set when calling endpoint.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g: Authorization'
),
)
custom_header_val = colander.SchemaNode(
colander.String(),
title=_('Custom Header Value'),
description=_('Custom Header value to be set when calling endpoint.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. Basic XxXxXx'
),
)
method_type = colander.SchemaNode(
colander.String(),
title=_('Call Method'),
description=_('Select if the Webhook call should be made '
'with POST or GET.'),
default='post',
missing='',
widget=deform.widget.RadioChoiceWidget(
values=[('get', 'GET'), ('post', 'POST')],
inline=True
),
)
class WebhookIntegrationType(IntegrationTypeBase):
key = 'webhook'
display_name = _('Webhook')
description = _('Post json events to a Webhook endpoint')
@classmethod
def icon(cls):
return ''''''
valid_events = [
events.PullRequestCloseEvent,
events.PullRequestMergeEvent,
events.PullRequestUpdateEvent,
events.PullRequestCommentEvent,
events.PullRequestReviewEvent,
events.PullRequestCreateEvent,
events.RepoPushEvent,
events.RepoCreateEvent,
]
def settings_schema(self):
schema = WebhookSettingsSchema()
schema.add(colander.SchemaNode(
colander.Set(),
widget=deform.widget.CheckboxChoiceWidget(
values=sorted(
[(e.name, e.display_name) for e in self.valid_events]
)
),
description="Events activated for this integration",
name='events'
))
return schema
def send_event(self, event):
log.debug('handling event %s with Webhook integration %s',
event.name, self)
if event.__class__ not in self.valid_events:
log.debug('event not valid: %r' % event)
return
if event.name not in self.settings['events']:
log.debug('event ignored: %r' % event)
return
data = event.as_dict()
template_url = self.settings['url']
headers = {}
head_key = self.settings.get('custom_header_key')
head_val = self.settings.get('custom_header_val')
if head_key and head_val:
headers = {head_key: head_val}
handler = WebhookHandler(
template_url, self.settings['secret_token'], headers)
url_calls = handler(event, data)
log.debug('webhook: calling following urls: %s',
[x[0] for x in url_calls])
run_task(post_to_webhook, url_calls, self.settings)
@async_task(ignore_result=True, base=RequestContextTask)
def post_to_webhook(url_calls, settings):
"""
Example data::
{'actor': {'user_id': 2, 'username': u'admin'},
'actor_ip': u'192.168.157.1',
'name': 'repo-push',
'push': {'branches': [{'name': u'default',
'url': 'http://rc.local:8080/hg-repo/changelog?branch=default'}],
'commits': [{'author': u'Marcin Kuzminski ',
'branch': u'default',
'date': datetime.datetime(2017, 11, 30, 12, 59, 48),
'issues': [],
'mentions': [],
'message': u'commit Thu 30 Nov 2017 13:59:48 CET',
'message_html': u'commit Thu 30 Nov 2017 13:59:48 CET',
'message_html_title': u'commit Thu 30 Nov 2017 13:59:48 CET',
'parents': [{'raw_id': '431b772a5353dad9974b810dd3707d79e3a7f6e0'}],
'permalink_url': u'http://rc.local:8080/_7/changeset/a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf',
'raw_id': 'a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf',
'refs': {'bookmarks': [], 'branches': [u'default'], 'tags': [u'tip']},
'reviewers': [],
'revision': 9L,
'short_id': 'a815cc738b96',
'url': u'http://rc.local:8080/hg-repo/changeset/a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf'}],
'issues': {}},
'repo': {'extra_fields': '',
'permalink_url': u'http://rc.local:8080/_7',
'repo_id': 7,
'repo_name': u'hg-repo',
'repo_type': u'hg',
'url': u'http://rc.local:8080/hg-repo'},
'server_url': u'http://rc.local:8080',
'utc_timestamp': datetime.datetime(2017, 11, 30, 13, 0, 1, 569276)
"""
max_retries = 3
retries = Retry(
total=max_retries,
backoff_factor=0.15,
status_forcelist=[500, 502, 503, 504])
call_headers = {
'User-Agent': 'RhodeCode-webhook-caller/{}'.format(
rhodecode.__version__)
} # updated below with custom ones, allows override
auth = get_auth(settings)
for url, token, headers, data in url_calls:
req_session = requests.Session()
req_session.mount( # retry max N times
'http://', requests.adapters.HTTPAdapter(max_retries=retries))
method = settings.get('method_type') or 'post'
call_method = getattr(req_session, method)
headers = headers or {}
call_headers.update(headers)
log.debug('calling Webhook with method: %s, and auth:%s',
call_method, auth)
if settings.get('log_data'):
log.debug('calling webhook with data: %s', data)
resp = call_method(url, json={
'token': token,
'event': data
}, headers=call_headers, auth=auth)
log.debug('Got Webhook response: %s', resp)
try:
resp.raise_for_status() # raise exception on a failed request
except Exception:
log.error(resp.text)
raise