##// END OF EJS Templates
svn: Update and add doc strings and comments.
svn: Update and add doc strings and comments.

File last commit:

r518:9f9ffd3a default
r560:01b438e0 default
Show More
webhook.py
111 lines | 3.6 KiB | text/x-python | PythonLexer
dan
integrations: add webhook integration
r444 # -*- coding: utf-8 -*-
# Copyright (C) 2012-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
from __future__ import unicode_literals
dan
forms: add deform for integration settings forms
r518 import deform
dan
integrations: add webhook integration
r444 import logging
import requests
import colander
from celery.task import task
from mako.template import Template
from rhodecode import events
from rhodecode.translation import lazy_ugettext
from rhodecode.integrations.types.base import IntegrationTypeBase
from rhodecode.integrations.schema import IntegrationSettingsSchemaBase
Martin Bornhold
logging: Use __name__ when requesting a logger
r504 log = logging.getLogger(__name__)
dan
integrations: add webhook integration
r444
class WebhookSettingsSchema(IntegrationSettingsSchemaBase):
url = colander.SchemaNode(
colander.String(),
title=lazy_ugettext('Webhook URL'),
description=lazy_ugettext('URL of the webhook to receive POST event.'),
default='',
validator=colander.url,
dan
forms: add deform for integration settings forms
r518 widget=deform.widget.TextInputWidget(
placeholder='https://www.example.com/webhook'
),
dan
integrations: add webhook integration
r444 )
secret_token = colander.SchemaNode(
colander.String(),
title=lazy_ugettext('Secret Token'),
description=lazy_ugettext('String used to validate received payloads.'),
default='',
dan
forms: add deform for integration settings forms
r518 widget=deform.widget.TextInputWidget(
placeholder='secret_token'
),
dan
integrations: add webhook integration
r444 )
class WebhookIntegrationType(IntegrationTypeBase):
key = 'webhook'
display_name = lazy_ugettext('Webhook')
valid_events = [
events.PullRequestCloseEvent,
events.PullRequestMergeEvent,
events.PullRequestUpdateEvent,
events.PullRequestCommentEvent,
events.PullRequestReviewEvent,
events.PullRequestCreateEvent,
events.RepoPushEvent,
events.RepoCreateEvent,
]
dan
forms: add deform for integration settings forms
r518 def settings_schema(self):
dan
integrations: add webhook integration
r444 schema = WebhookSettingsSchema()
schema.add(colander.SchemaNode(
colander.Set(),
dan
forms: add deform for integration settings forms
r518 widget=deform.widget.CheckboxChoiceWidget(
values=sorted(
[(e.name, e.display_name) for e in self.valid_events]
)
),
dan
integrations: add webhook integration
r444 description="Events activated for this integration",
name='events'
))
return schema
def send_event(self, event):
log.debug('handling event %s with webhook integration %s',
event.name, self)
if event.__class__ not in self.valid_events:
log.debug('event not valid: %r' % event)
return
if event.name not in self.settings['events']:
log.debug('event ignored: %r' % event)
return
data = event.as_dict()
post_to_webhook(data, self.settings)
@task(ignore_result=True)
def post_to_webhook(data, settings):
log.debug('sending event:%s to webhook %s', data['name'], settings['url'])
resp = requests.post(settings['url'], json={
'token': settings['secret_token'],
'event': data
})
resp.raise_for_status() # raise exception on a failed request