webhook.py
111 lines
| 3.6 KiB
| text/x-python
|
PythonLexer
r444 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2012-2016 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
from __future__ import unicode_literals | ||||
r518 | import deform | |||
r444 | import logging | |||
import requests | ||||
import colander | ||||
from celery.task import task | ||||
from mako.template import Template | ||||
from rhodecode import events | ||||
from rhodecode.translation import lazy_ugettext | ||||
from rhodecode.integrations.types.base import IntegrationTypeBase | ||||
from rhodecode.integrations.schema import IntegrationSettingsSchemaBase | ||||
Martin Bornhold
|
r504 | log = logging.getLogger(__name__) | ||
r444 | ||||
class WebhookSettingsSchema(IntegrationSettingsSchemaBase): | ||||
url = colander.SchemaNode( | ||||
colander.String(), | ||||
title=lazy_ugettext('Webhook URL'), | ||||
description=lazy_ugettext('URL of the webhook to receive POST event.'), | ||||
default='', | ||||
validator=colander.url, | ||||
r518 | widget=deform.widget.TextInputWidget( | |||
placeholder='https://www.example.com/webhook' | ||||
), | ||||
r444 | ) | |||
secret_token = colander.SchemaNode( | ||||
colander.String(), | ||||
title=lazy_ugettext('Secret Token'), | ||||
description=lazy_ugettext('String used to validate received payloads.'), | ||||
default='', | ||||
r518 | widget=deform.widget.TextInputWidget( | |||
placeholder='secret_token' | ||||
), | ||||
r444 | ) | |||
class WebhookIntegrationType(IntegrationTypeBase): | ||||
key = 'webhook' | ||||
display_name = lazy_ugettext('Webhook') | ||||
valid_events = [ | ||||
events.PullRequestCloseEvent, | ||||
events.PullRequestMergeEvent, | ||||
events.PullRequestUpdateEvent, | ||||
events.PullRequestCommentEvent, | ||||
events.PullRequestReviewEvent, | ||||
events.PullRequestCreateEvent, | ||||
events.RepoPushEvent, | ||||
events.RepoCreateEvent, | ||||
] | ||||
r518 | def settings_schema(self): | |||
r444 | schema = WebhookSettingsSchema() | |||
schema.add(colander.SchemaNode( | ||||
colander.Set(), | ||||
r518 | widget=deform.widget.CheckboxChoiceWidget( | |||
values=sorted( | ||||
[(e.name, e.display_name) for e in self.valid_events] | ||||
) | ||||
), | ||||
r444 | description="Events activated for this integration", | |||
name='events' | ||||
)) | ||||
return schema | ||||
def send_event(self, event): | ||||
log.debug('handling event %s with webhook integration %s', | ||||
event.name, self) | ||||
if event.__class__ not in self.valid_events: | ||||
log.debug('event not valid: %r' % event) | ||||
return | ||||
if event.name not in self.settings['events']: | ||||
log.debug('event ignored: %r' % event) | ||||
return | ||||
data = event.as_dict() | ||||
post_to_webhook(data, self.settings) | ||||
@task(ignore_result=True) | ||||
def post_to_webhook(data, settings): | ||||
log.debug('sending event:%s to webhook %s', data['name'], settings['url']) | ||||
resp = requests.post(settings['url'], json={ | ||||
'token': settings['secret_token'], | ||||
'event': data | ||||
}) | ||||
resp.raise_for_status() # raise exception on a failed request | ||||