# -*- coding: utf-8 -*-
# Copyright (C) 2012-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
from __future__ import unicode_literals
import string
from collections import OrderedDict
import deform
import logging
import requests
import colander
from celery.task import task
from requests.packages.urllib3.util.retry import Retry
from rhodecode import events
from rhodecode.translation import _
from rhodecode.integrations.types.base import IntegrationTypeBase
log = logging.getLogger(__name__)
# updating this required to update the `base_vars` passed in url calling func
WEBHOOK_URL_VARS = [
'repo_name',
'repo_type',
'repo_id',
'repo_url',
# special attrs below that we handle, using multi-call
'branch',
'commit_id',
# pr events vars
'pull_request_id',
'pull_request_url',
]
URL_VARS = ', '.join('${' + x + '}' for x in WEBHOOK_URL_VARS)
class WebhookHandler(object):
def __init__(self, template_url, secret_token):
self.template_url = template_url
self.secret_token = secret_token
def get_base_parsed_template(self, data):
"""
initially parses the passed in template with some common variables
available on ALL calls
"""
# note: make sure to update the `WEBHOOK_URL_VARS` if this changes
common_vars = {
'repo_name': data['repo']['repo_name'],
'repo_type': data['repo']['repo_type'],
'repo_id': data['repo']['repo_id'],
'repo_url': data['repo']['url'],
}
return string.Template(
self.template_url).safe_substitute(**common_vars)
def repo_push_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
url_cals = []
branch_data = OrderedDict()
for obj in data['push']['branches']:
branch_data[obj['name']] = obj
branches_commits = OrderedDict()
for commit in data['push']['commits']:
if commit['branch'] not in branches_commits:
branch_commits = {'branch': branch_data[commit['branch']],
'commits': []}
branches_commits[commit['branch']] = branch_commits
branch_commits = branches_commits[commit['branch']]
branch_commits['commits'].append(commit)
if '${branch}' in url:
# call it multiple times, for each branch if used in variables
for branch, commit_ids in branches_commits.items():
branch_url = string.Template(url).safe_substitute(branch=branch)
# call further down for each commit if used
if '${commit_id}' in branch_url:
for commit_data in commit_ids['commits']:
commit_id = commit_data['raw_id']
commit_url = string.Template(branch_url).safe_substitute(
commit_id=commit_id)
# register per-commit call
log.debug(
'register webhook call(%s) to url %s', event, commit_url)
url_cals.append((commit_url, self.secret_token, data))
else:
# register per-branch call
log.debug(
'register webhook call(%s) to url %s', event, branch_url)
url_cals.append((branch_url, self.secret_token, data))
else:
log.debug(
'register webhook call(%s) to url %s', event, url)
url_cals.append((url, self.secret_token, data))
return url_cals
def repo_create_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug(
'register webhook call(%s) to url %s', event, url)
return [(url, self.secret_token, data)]
def pull_request_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug(
'register webhook call(%s) to url %s', event, url)
url = string.Template(url).safe_substitute(
pull_request_id=data['pullrequest']['pull_request_id'],
pull_request_url=data['pullrequest']['url'])
return [(url, self.secret_token, data)]
def __call__(self, event, data):
if isinstance(event, events.RepoPushEvent):
return self.repo_push_event_handler(event, data)
elif isinstance(event, events.RepoCreateEvent):
return self.repo_create_event_handler(event, data)
elif isinstance(event, events.PullRequestEvent):
return self.pull_request_event_handler(event, data)
else:
raise ValueError('event type not supported: %s' % events)
class WebhookSettingsSchema(colander.Schema):
url = colander.SchemaNode(
colander.String(),
title=_('Webhook URL'),
description=
_('URL of the webhook to receive POST event. Following variables '
'are allowed to be used: {vars}. Some of the variables would '
'trigger multiple calls, like ${{branch}} or ${{commit_id}}. '
'Webhook will be called as many times as unique objects in '
'data in such cases.').format(vars=URL_VARS),
missing=colander.required,
required=True,
validator=colander.url,
widget=deform.widget.TextInputWidget(
placeholder='https://www.example.com/webhook'
),
)
secret_token = colander.SchemaNode(
colander.String(),
title=_('Secret Token'),
description=_('String used to validate received payloads.'),
default='',
missing='',
widget=deform.widget.TextInputWidget(
placeholder='secret_token'
),
)
class WebhookIntegrationType(IntegrationTypeBase):
key = 'webhook'
display_name = _('Webhook')
description = _('Post json events to a webhook endpoint')
icon = ''''''
valid_events = [
events.PullRequestCloseEvent,
events.PullRequestMergeEvent,
events.PullRequestUpdateEvent,
events.PullRequestCommentEvent,
events.PullRequestReviewEvent,
events.PullRequestCreateEvent,
events.RepoPushEvent,
events.RepoCreateEvent,
]
def settings_schema(self):
schema = WebhookSettingsSchema()
schema.add(colander.SchemaNode(
colander.Set(),
widget=deform.widget.CheckboxChoiceWidget(
values=sorted(
[(e.name, e.display_name) for e in self.valid_events]
)
),
description="Events activated for this integration",
name='events'
))
return schema
def send_event(self, event):
log.debug('handling event %s with webhook integration %s',
event.name, self)
if event.__class__ not in self.valid_events:
log.debug('event not valid: %r' % event)
return
if event.name not in self.settings['events']:
log.debug('event ignored: %r' % event)
return
data = event.as_dict()
template_url = self.settings['url']
handler = WebhookHandler(template_url, self.settings['secret_token'])
url_calls = handler(event, data)
log.debug('webhook: calling following urls: %s',
[x[0] for x in url_calls])
post_to_webhook(url_calls)
@task(ignore_result=True)
def post_to_webhook(url_calls):
max_retries = 3
for url, token, data in url_calls:
# retry max N times
retries = Retry(
total=max_retries,
backoff_factor=0.15,
status_forcelist=[500, 502, 503, 504])
req_session = requests.Session()
req_session.mount(
'http://', requests.adapters.HTTPAdapter(max_retries=retries))
resp = req_session.post(url, json={
'token': token,
'event': data
})
resp.raise_for_status() # raise exception on a failed request