# -*- coding: utf-8 -*-
# Copyright (C) 2012-2018 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
from __future__ import unicode_literals
import string
from collections import OrderedDict
import deform
import deform.widget
import logging
import requests
import requests.adapters
import colander
from requests.packages.urllib3.util.retry import Retry
import rhodecode
from rhodecode import events
from rhodecode.translation import _
from rhodecode.integrations.types.base import IntegrationTypeBase
from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
log = logging.getLogger(__name__)
# updating this required to update the `common_vars` passed in url calling func
WEBHOOK_URL_VARS = [
'repo_name',
'repo_type',
'repo_id',
'repo_url',
# extra repo fields
'extra:',
# special attrs below that we handle, using multi-call
'branch',
'commit_id',
# pr events vars
'pull_request_id',
'pull_request_url',
# user who triggers the call
'username',
'user_id',
]
URL_VARS = ', '.join('${' + x + '}' for x in WEBHOOK_URL_VARS)
def get_auth(settings):
from requests.auth import HTTPBasicAuth
username = settings.get('username')
password = settings.get('password')
if username and password:
return HTTPBasicAuth(username, password)
return None
class WebhookHandler(object):
def __init__(self, template_url, secret_token, headers):
self.template_url = template_url
self.secret_token = secret_token
self.headers = headers
def get_base_parsed_template(self, data):
"""
initially parses the passed in template with some common variables
available on ALL calls
"""
# note: make sure to update the `WEBHOOK_URL_VARS` if this changes
common_vars = {
'repo_name': data['repo']['repo_name'],
'repo_type': data['repo']['repo_type'],
'repo_id': data['repo']['repo_id'],
'repo_url': data['repo']['url'],
'username': data['actor']['username'],
'user_id': data['actor']['user_id']
}
extra_vars = {}
for extra_key, extra_val in data['repo']['extra_fields'].items():
extra_vars['extra__{}'.format(extra_key)] = extra_val
common_vars.update(extra_vars)
template_url = self.template_url.replace('${extra:', '${extra__')
return string.Template(template_url).safe_substitute(**common_vars)
def repo_push_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
url_cals = []
branch_data = OrderedDict()
for obj in data['push']['branches']:
branch_data[obj['name']] = obj
branches_commits = OrderedDict()
for commit in data['push']['commits']:
if commit.get('git_ref_change'):
# special case for GIT that allows creating tags,
# deleting branches without associated commit
continue
if commit['branch'] not in branches_commits:
branch_commits = {'branch': branch_data[commit['branch']],
'commits': []}
branches_commits[commit['branch']] = branch_commits
branch_commits = branches_commits[commit['branch']]
branch_commits['commits'].append(commit)
if '${branch}' in url:
# call it multiple times, for each branch if used in variables
for branch, commit_ids in branches_commits.items():
branch_url = string.Template(url).safe_substitute(branch=branch)
# call further down for each commit if used
if '${commit_id}' in branch_url:
for commit_data in commit_ids['commits']:
commit_id = commit_data['raw_id']
commit_url = string.Template(branch_url).safe_substitute(
commit_id=commit_id)
# register per-commit call
log.debug(
'register webhook call(%s) to url %s', event, commit_url)
url_cals.append((commit_url, self.secret_token, self.headers, data))
else:
# register per-branch call
log.debug(
'register webhook call(%s) to url %s', event, branch_url)
url_cals.append((branch_url, self.secret_token, self.headers, data))
else:
log.debug(
'register webhook call(%s) to url %s', event, url)
url_cals.append((url, self.secret_token, self.headers, data))
return url_cals
def repo_create_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug(
'register webhook call(%s) to url %s', event, url)
return [(url, self.secret_token, self.headers, data)]
def pull_request_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug(
'register webhook call(%s) to url %s', event, url)
url = string.Template(url).safe_substitute(
pull_request_id=data['pullrequest']['pull_request_id'],
pull_request_url=data['pullrequest']['url'])
return [(url, self.secret_token, self.headers, data)]
def __call__(self, event, data):
if isinstance(event, events.RepoPushEvent):
return self.repo_push_event_handler(event, data)
elif isinstance(event, events.RepoCreateEvent):
return self.repo_create_event_handler(event, data)
elif isinstance(event, events.PullRequestEvent):
return self.pull_request_event_handler(event, data)
else:
raise ValueError('event type not supported: %s' % events)
class WebhookSettingsSchema(colander.Schema):
url = colander.SchemaNode(
colander.String(),
title=_('Webhook URL'),
description=
_('URL to which Webhook should submit data. Following variables '
'are allowed to be used: {vars}. Some of the variables would '
'trigger multiple calls, like ${{branch}} or ${{commit_id}}. '
'Webhook will be called as many times as unique objects in '
'data in such cases.').format(vars=URL_VARS),
missing=colander.required,
required=True,
validator=colander.url,
widget=deform.widget.TextInputWidget(
placeholder='https://www.example.com/webhook'
),
)
secret_token = colander.SchemaNode(
colander.String(),
title=_('Secret Token'),
description=_('Optional string used to validate received payloads. '
'It will be sent together with event data in JSON'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. secret_token'
),
)
username = colander.SchemaNode(
colander.String(),
title=_('Username'),
description=_('Optional username to authenticate the call.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. admin'
),
)
password = colander.SchemaNode(
colander.String(),
title=_('Password'),
description=_('Optional password to authenticate the call.'),
default='',
missing='',
widget=deform.widget.PasswordWidget(
placeholder='e.g. secret.',
redisplay=True,
),
)
custom_header_key = colander.SchemaNode(
colander.String(),
title=_('Custom Header Key'),
description=_('Custom Header name to be set when calling endpoint.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g.Authorization'
),
)
custom_header_val = colander.SchemaNode(
colander.String(),
title=_('Custom Header Value'),
description=_('Custom Header value to be set when calling endpoint.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='e.g. RcLogin auth=xxxx'
),
)
method_type = colander.SchemaNode(
colander.String(),
title=_('Call Method'),
description=_('Select if the Webhook call should be made '
'with POST or GET.'),
default='post',
missing='',
widget=deform.widget.RadioChoiceWidget(
values=[('get', 'GET'), ('post', 'POST')],
inline=True
),
)
class WebhookIntegrationType(IntegrationTypeBase):
key = 'webhook'
display_name = _('Webhook')
description = _('Post json events to a Webhook endpoint')
icon = ''''''
valid_events = [
events.PullRequestCloseEvent,
events.PullRequestMergeEvent,
events.PullRequestUpdateEvent,
events.PullRequestCommentEvent,
events.PullRequestReviewEvent,
events.PullRequestCreateEvent,
events.RepoPushEvent,
events.RepoCreateEvent,
]
def settings_schema(self):
schema = WebhookSettingsSchema()
schema.add(colander.SchemaNode(
colander.Set(),
widget=deform.widget.CheckboxChoiceWidget(
values=sorted(
[(e.name, e.display_name) for e in self.valid_events]
)
),
description="Events activated for this integration",
name='events'
))
return schema
def send_event(self, event):
log.debug('handling event %s with Webhook integration %s',
event.name, self)
if event.__class__ not in self.valid_events:
log.debug('event not valid: %r' % event)
return
if event.name not in self.settings['events']:
log.debug('event ignored: %r' % event)
return
data = event.as_dict()
template_url = self.settings['url']
headers = {}
head_key = self.settings.get('custom_header_key')
head_val = self.settings.get('custom_header_val')
if head_key and head_val:
headers = {head_key: head_val}
handler = WebhookHandler(
template_url, self.settings['secret_token'], headers)
url_calls = handler(event, data)
log.debug('webhook: calling following urls: %s',
[x[0] for x in url_calls])
run_task(post_to_webhook, url_calls, self.settings)
@async_task(ignore_result=True, base=RequestContextTask)
def post_to_webhook(url_calls, settings):
"""
Example data::
{'actor': {'user_id': 2, 'username': u'admin'},
'actor_ip': u'192.168.157.1',
'name': 'repo-push',
'push': {'branches': [{'name': u'default',
'url': 'http://rc.local:8080/hg-repo/changelog?branch=default'}],
'commits': [{'author': u'Marcin Kuzminski ',
'branch': u'default',
'date': datetime.datetime(2017, 11, 30, 12, 59, 48),
'issues': [],
'mentions': [],
'message': u'commit Thu 30 Nov 2017 13:59:48 CET',
'message_html': u'commit Thu 30 Nov 2017 13:59:48 CET',
'message_html_title': u'commit Thu 30 Nov 2017 13:59:48 CET',
'parents': [{'raw_id': '431b772a5353dad9974b810dd3707d79e3a7f6e0'}],
'permalink_url': u'http://rc.local:8080/_7/changeset/a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf',
'raw_id': 'a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf',
'refs': {'bookmarks': [], 'branches': [u'default'], 'tags': [u'tip']},
'reviewers': [],
'revision': 9L,
'short_id': 'a815cc738b96',
'url': u'http://rc.local:8080/hg-repo/changeset/a815cc738b9651eb5ffbcfb1ce6ccd7c701a5ddf'}],
'issues': {}},
'repo': {'extra_fields': '',
'permalink_url': u'http://rc.local:8080/_7',
'repo_id': 7,
'repo_name': u'hg-repo',
'repo_type': u'hg',
'url': u'http://rc.local:8080/hg-repo'},
'server_url': u'http://rc.local:8080',
'utc_timestamp': datetime.datetime(2017, 11, 30, 13, 0, 1, 569276)
"""
max_retries = 3
retries = Retry(
total=max_retries,
backoff_factor=0.15,
status_forcelist=[500, 502, 503, 504])
call_headers = {
'User-Agent': 'RhodeCode-webhook-caller/{}'.format(
rhodecode.__version__)
} # updated below with custom ones, allows override
for url, token, headers, data in url_calls:
req_session = requests.Session()
req_session.mount( # retry max N times
'http://', requests.adapters.HTTPAdapter(max_retries=retries))
method = settings.get('method_type') or 'post'
call_method = getattr(req_session, method)
headers = headers or {}
call_headers.update(headers)
auth = get_auth(settings)
log.debug('calling Webhook with method: %s, and auth:%s',
call_method, auth)
if settings.get('log_data'):
log.debug('calling webhook with data: %s', data)
resp = call_method(url, json={
'token': token,
'event': data
}, headers=call_headers, auth=auth)
log.debug('Got Webhook response: %s', resp)
resp.raise_for_status() # raise exception on a failed request