# Copyright (C) 2012-2020 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import colander
import string
import collections
import logging
import requests
import urllib.request
import urllib.parse
import urllib.error
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from mako import exceptions
from rhodecode.lib.utils2 import safe_str
from rhodecode.translation import _
log = logging.getLogger(__name__)
class UrlTmpl(string.Template):
def safe_substitute(self, **kws):
# url encode the kw for usage in url
kws = {k: urllib.parse.quote(safe_str(v)) for k, v in kws.items()}
return super(UrlTmpl, self).safe_substitute(**kws)
class IntegrationTypeBase(object):
""" Base class for IntegrationType plugins """
is_dummy = False
description = ''
@classmethod
def icon(cls):
return '''
'''
def __init__(self, settings):
"""
:param settings: dict of settings to be used for the integration
"""
self.settings = settings
def settings_schema(self):
"""
A colander schema of settings for the integration type
"""
return colander.Schema()
def event_enabled(self, event):
"""
Checks if submitted event is enabled based on the plugin settings
:param event:
:return: bool
"""
allowed_events = self.settings.get('events') or []
if event.name not in allowed_events:
log.debug('event ignored: %r event %s not in allowed set of events %s',
event, event.name, allowed_events)
return False
return True
class EEIntegration(IntegrationTypeBase):
description = 'Integration available in RhodeCode EE edition.'
is_dummy = True
def __init__(self, name, key, settings=None):
self.display_name = name
self.key = key
super(EEIntegration, self).__init__(settings)
# Helpers #
# updating this required to update the `common_vars` as well.
WEBHOOK_URL_VARS = [
# GENERAL
('General', [
('event_name', 'Unique name of the event type, e.g pullrequest-update'),
('repo_name', 'Full name of the repository'),
('repo_type', 'VCS type of repository'),
('repo_id', 'Unique id of repository'),
('repo_url', 'Repository url'),
]
),
# extra repo fields
('Repository', [
('extra:', 'Extra repo variables, read from its settings.'),
]
),
# special attrs below that we handle, using multi-call
('Commit push - Multicalls', [
('branch', 'Name of each branch submitted, if any.'),
('branch_head', 'Head ID of pushed branch (full sha of last commit), if any.'),
('commit_id', 'ID (full sha) of each commit submitted, if any.'),
]
),
# pr events vars
('Pull request', [
('pull_request_id', 'Unique ID of the pull request.'),
('pull_request_title', 'Title of the pull request.'),
('pull_request_url', 'Pull request url.'),
('pull_request_shadow_url', 'Pull request shadow repo clone url.'),
('pull_request_commits_uid', 'Calculated UID of all commits inside the PR. '
'Changes after PR update'),
]
),
# commit comment event vars
('Commit comment', [
('commit_comment_id', 'Unique ID of the comment made on a commit.'),
('commit_comment_text', 'Text of commit comment.'),
('commit_comment_type', 'Type of comment, e.g note/todo.'),
('commit_comment_f_path', 'Optionally path of file for inline comments.'),
('commit_comment_line_no', 'Line number of the file: eg o10, or n200'),
('commit_comment_commit_id', 'Commit id that comment was left at.'),
('commit_comment_commit_branch', 'Commit branch that comment was left at'),
('commit_comment_commit_message', 'Commit message that comment was left at'),
]
),
# user who triggers the call
('Caller', [
('username', 'User who triggered the call.'),
('user_id', 'User id who triggered the call.'),
]
),
]
# common vars for url template used for CI plugins. Shared with webhook
CI_URL_VARS = WEBHOOK_URL_VARS
class CommitParsingDataHandler(object):
def aggregate_branch_data(self, branches, commits):
branch_data = collections.OrderedDict()
for obj in branches:
branch_data[obj['name']] = obj
branches_commits = collections.OrderedDict()
for commit in commits:
if commit.get('git_ref_change'):
# special case for GIT that allows creating tags,
# deleting branches without associated commit
continue
commit_branch = commit['branch']
if commit_branch not in branches_commits:
_branch = branch_data[commit_branch] \
if commit_branch else commit_branch
branch_commits = {'branch': _branch,
'branch_head': '',
'commits': []}
branches_commits[commit_branch] = branch_commits
branch_commits = branches_commits[commit_branch]
branch_commits['commits'].append(commit)
branch_commits['branch_head'] = commit['raw_id']
return branches_commits
class WebhookDataHandler(CommitParsingDataHandler):
name = 'webhook'
def __init__(self, template_url, headers):
self.template_url = template_url
self.headers = headers
def get_base_parsed_template(self, data):
"""
initially parses the passed in template with some common variables
available on ALL calls
"""
# note: make sure to update the `WEBHOOK_URL_VARS` if this changes
common_vars = {
'repo_name': data['repo']['repo_name'],
'repo_type': data['repo']['repo_type'],
'repo_id': data['repo']['repo_id'],
'repo_url': data['repo']['url'],
'username': data['actor']['username'],
'user_id': data['actor']['user_id'],
'event_name': data['name']
}
extra_vars = {}
for extra_key, extra_val in data['repo']['extra_fields'].items():
extra_vars['extra__{}'.format(extra_key)] = extra_val
common_vars.update(extra_vars)
template_url = self.template_url.replace('${extra:', '${extra__')
for k, v in common_vars.items():
template_url = UrlTmpl(template_url).safe_substitute(**{k: v})
return template_url
def repo_push_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
url_calls = []
branches_commits = self.aggregate_branch_data(
data['push']['branches'], data['push']['commits'])
if '${branch}' in url or '${branch_head}' in url or '${commit_id}' in url:
# call it multiple times, for each branch if used in variables
for branch, commit_ids in branches_commits.items():
branch_url = UrlTmpl(url).safe_substitute(branch=branch)
if '${branch_head}' in branch_url:
# last commit in the aggregate is the head of the branch
branch_head = commit_ids['branch_head']
branch_url = UrlTmpl(branch_url).safe_substitute(branch_head=branch_head)
# call further down for each commit if used
if '${commit_id}' in branch_url:
for commit_data in commit_ids['commits']:
commit_id = commit_data['raw_id']
commit_url = UrlTmpl(branch_url).safe_substitute(commit_id=commit_id)
# register per-commit call
log.debug(
'register %s call(%s) to url %s',
self.name, event, commit_url)
url_calls.append(
(commit_url, self.headers, data))
else:
# register per-branch call
log.debug('register %s call(%s) to url %s',
self.name, event, branch_url)
url_calls.append((branch_url, self.headers, data))
else:
log.debug('register %s call(%s) to url %s', self.name, event, url)
url_calls.append((url, self.headers, data))
return url_calls
def repo_commit_comment_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug('register %s call(%s) to url %s', self.name, event, url)
comment_vars = [
('commit_comment_id', data['comment']['comment_id']),
('commit_comment_text', data['comment']['comment_text']),
('commit_comment_type', data['comment']['comment_type']),
('commit_comment_f_path', data['comment']['comment_f_path']),
('commit_comment_line_no', data['comment']['comment_line_no']),
('commit_comment_commit_id', data['commit']['commit_id']),
('commit_comment_commit_branch', data['commit']['commit_branch']),
('commit_comment_commit_message', data['commit']['commit_message']),
]
for k, v in comment_vars:
url = UrlTmpl(url).safe_substitute(**{k: v})
return [(url, self.headers, data)]
def repo_commit_comment_edit_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug('register %s call(%s) to url %s', self.name, event, url)
comment_vars = [
('commit_comment_id', data['comment']['comment_id']),
('commit_comment_text', data['comment']['comment_text']),
('commit_comment_type', data['comment']['comment_type']),
('commit_comment_f_path', data['comment']['comment_f_path']),
('commit_comment_line_no', data['comment']['comment_line_no']),
('commit_comment_commit_id', data['commit']['commit_id']),
('commit_comment_commit_branch', data['commit']['commit_branch']),
('commit_comment_commit_message', data['commit']['commit_message']),
]
for k, v in comment_vars:
url = UrlTmpl(url).safe_substitute(**{k: v})
return [(url, self.headers, data)]
def repo_create_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug('register %s call(%s) to url %s', self.name, event, url)
return [(url, self.headers, data)]
def pull_request_event_handler(self, event, data):
url = self.get_base_parsed_template(data)
log.debug('register %s call(%s) to url %s', self.name, event, url)
pr_vars = [
('pull_request_id', data['pullrequest']['pull_request_id']),
('pull_request_title', data['pullrequest']['title']),
('pull_request_url', data['pullrequest']['url']),
('pull_request_shadow_url', data['pullrequest']['shadow_url']),
('pull_request_commits_uid', data['pullrequest']['commits_uid']),
]
for k, v in pr_vars:
url = UrlTmpl(url).safe_substitute(**{k: v})
return [(url, self.headers, data)]
def __call__(self, event, data):
from rhodecode import events
if isinstance(event, events.RepoPushEvent):
return self.repo_push_event_handler(event, data)
elif isinstance(event, events.RepoCreateEvent):
return self.repo_create_event_handler(event, data)
elif isinstance(event, events.RepoCommitCommentEvent):
return self.repo_commit_comment_handler(event, data)
elif isinstance(event, events.RepoCommitCommentEditEvent):
return self.repo_commit_comment_edit_handler(event, data)
elif isinstance(event, events.PullRequestEvent):
return self.pull_request_event_handler(event, data)
else:
raise ValueError(
'event type `{}` has no handler defined'.format(event.__class__))
def get_auth(settings):
from requests.auth import HTTPBasicAuth
username = settings.get('username')
password = settings.get('password')
if username and password:
return HTTPBasicAuth(username, password)
return None
def get_web_token(settings):
return settings['secret_token']
def get_url_vars(url_vars):
items = []
for section, section_items in url_vars:
items.append('\n*{}*'.format(section))
for key, explanation in section_items:
items.append(' {} - {}'.format('${' + key + '}', explanation))
return '\n'.join(items)
def render_with_traceback(template, *args, **kwargs):
try:
return template.render(*args, **kwargs)
except Exception:
log.error(exceptions.text_error_template().render())
raise
STATUS_400 = (400, 401, 403)
STATUS_500 = (500, 502, 504)
def requests_retry_call(
retries=3, backoff_factor=0.3, status_forcelist=STATUS_400+STATUS_500,
session=None):
"""
session = requests_retry_session()
response = session.get('http://example.com')
:param retries:
:param backoff_factor:
:param status_forcelist:
:param session:
"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session