# Copyright (C) 2012-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import colander import string import collections import logging import requests import urllib.request import urllib.parse import urllib.error from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from mako import exceptions from rhodecode.lib.utils2 import safe_str from rhodecode.translation import _ log = logging.getLogger(__name__) class UrlTmpl(string.Template): def safe_substitute(self, **kws): # url encode the kw for usage in url kws = {k: urllib.parse.quote(safe_str(v)) for k, v in kws.items()} return super().safe_substitute(**kws) class IntegrationTypeBase(object): """ Base class for IntegrationType plugins """ is_dummy = False description = '' @classmethod def icon(cls): return ''' image/svg+xml ''' def __init__(self, settings): """ :param settings: dict of settings to be used for the integration """ self.settings = settings def settings_schema(self): """ A colander schema of settings for the integration type """ return colander.Schema() def event_enabled(self, event): """ Checks if submitted event is enabled based on the plugin settings :param event: :return: bool """ allowed_events = self.settings.get('events') or [] if event.name not in allowed_events: log.debug('event ignored: %r event %s not in allowed set of events %s', event, event.name, allowed_events) return False return True class EEIntegration(IntegrationTypeBase): description = 'Integration available in RhodeCode EE edition.' is_dummy = True def __init__(self, name, key, settings=None): self.display_name = name self.key = key super(EEIntegration, self).__init__(settings) # Helpers # # updating this required to update the `common_vars` as well. WEBHOOK_URL_VARS = [ # GENERAL ('General', [ ('event_name', 'Unique name of the event type, e.g pullrequest-update'), ('repo_name', 'Full name of the repository'), ('repo_type', 'VCS type of repository'), ('repo_id', 'Unique id of repository'), ('repo_url', 'Repository url'), ] ), # extra repo fields ('Repository', [ ('extra:', 'Extra repo variables, read from its settings.'), ] ), # special attrs below that we handle, using multi-call ('Commit push - Multicalls', [ ('branch', 'Name of each branch submitted, if any.'), ('branch_head', 'Head ID of pushed branch (full sha of last commit), if any.'), ('commit_id', 'ID (full sha) of each commit submitted, if any.'), ] ), # pr events vars ('Pull request', [ ('pull_request_id', 'Unique ID of the pull request.'), ('pull_request_title', 'Title of the pull request.'), ('pull_request_url', 'Pull request url.'), ('pull_request_shadow_url', 'Pull request shadow repo clone url.'), ('pull_request_commits_uid', 'Calculated UID of all commits inside the PR. ' 'Changes after PR update'), ] ), # commit comment event vars ('Commit comment', [ ('commit_comment_id', 'Unique ID of the comment made on a commit.'), ('commit_comment_text', 'Text of commit comment.'), ('commit_comment_type', 'Type of comment, e.g note/todo.'), ('commit_comment_f_path', 'Optionally path of file for inline comments.'), ('commit_comment_line_no', 'Line number of the file: eg o10, or n200'), ('commit_comment_commit_id', 'Commit id that comment was left at.'), ('commit_comment_commit_branch', 'Commit branch that comment was left at'), ('commit_comment_commit_message', 'Commit message that comment was left at'), ] ), # user who triggers the call ('Caller', [ ('username', 'User who triggered the call.'), ('user_id', 'User id who triggered the call.'), ] ), ] # common vars for url template used for CI plugins. Shared with webhook CI_URL_VARS = WEBHOOK_URL_VARS class CommitParsingDataHandler(object): def aggregate_branch_data(self, branches, commits): branch_data = collections.OrderedDict() for obj in branches: branch_data[obj['name']] = obj branches_commits = collections.OrderedDict() for commit in commits: if commit.get('git_ref_change'): # special case for GIT that allows creating tags, # deleting branches without associated commit continue commit_branch = commit['branch'] if commit_branch not in branches_commits: _branch = branch_data[commit_branch] \ if commit_branch else commit_branch branch_commits = {'branch': _branch, 'branch_head': '', 'commits': []} branches_commits[commit_branch] = branch_commits branch_commits = branches_commits[commit_branch] branch_commits['commits'].append(commit) branch_commits['branch_head'] = commit['raw_id'] return branches_commits class WebhookDataHandler(CommitParsingDataHandler): name = 'webhook' def __init__(self, template_url, headers): self.template_url = template_url self.headers = headers def get_base_parsed_template(self, data): """ initially parses the passed in template with some common variables available on ALL calls """ # note: make sure to update the `WEBHOOK_URL_VARS` if this changes common_vars = { 'repo_name': data['repo']['repo_name'], 'repo_type': data['repo']['repo_type'], 'repo_id': data['repo']['repo_id'], 'repo_url': data['repo']['url'], 'username': data['actor']['username'], 'user_id': data['actor']['user_id'], 'event_name': data['name'] } extra_vars = {} for extra_key, extra_val in data['repo']['extra_fields'].items(): extra_vars[f'extra__{extra_key}'] = extra_val common_vars.update(extra_vars) template_url = self.template_url.replace('${extra:', '${extra__') for k, v in common_vars.items(): template_url = UrlTmpl(template_url).safe_substitute(**{k: v}) return template_url def repo_push_event_handler(self, event, data): url = self.get_base_parsed_template(data) url_calls = [] branches_commits = self.aggregate_branch_data( data['push']['branches'], data['push']['commits']) if '${branch}' in url or '${branch_head}' in url or '${commit_id}' in url: # call it multiple times, for each branch if used in variables for branch, commit_ids in branches_commits.items(): branch_url = UrlTmpl(url).safe_substitute(branch=branch) if '${branch_head}' in branch_url: # last commit in the aggregate is the head of the branch branch_head = commit_ids['branch_head'] branch_url = UrlTmpl(branch_url).safe_substitute(branch_head=branch_head) # call further down for each commit if used if '${commit_id}' in branch_url: for commit_data in commit_ids['commits']: commit_id = commit_data['raw_id'] commit_url = UrlTmpl(branch_url).safe_substitute(commit_id=commit_id) # register per-commit call log.debug( 'register %s call(%s) to url %s', self.name, event, commit_url) url_calls.append( (commit_url, self.headers, data)) else: # register per-branch call log.debug('register %s call(%s) to url %s', self.name, event, branch_url) url_calls.append((branch_url, self.headers, data)) else: log.debug('register %s call(%s) to url %s', self.name, event, url) url_calls.append((url, self.headers, data)) return url_calls def repo_commit_comment_handler(self, event, data): url = self.get_base_parsed_template(data) log.debug('register %s call(%s) to url %s', self.name, event, url) comment_vars = [ ('commit_comment_id', data['comment']['comment_id']), ('commit_comment_text', data['comment']['comment_text']), ('commit_comment_type', data['comment']['comment_type']), ('commit_comment_f_path', data['comment']['comment_f_path']), ('commit_comment_line_no', data['comment']['comment_line_no']), ('commit_comment_commit_id', data['commit']['commit_id']), ('commit_comment_commit_branch', data['commit']['commit_branch']), ('commit_comment_commit_message', data['commit']['commit_message']), ] for k, v in comment_vars: url = UrlTmpl(url).safe_substitute(**{k: v}) return [(url, self.headers, data)] def repo_commit_comment_edit_handler(self, event, data): url = self.get_base_parsed_template(data) log.debug('register %s call(%s) to url %s', self.name, event, url) comment_vars = [ ('commit_comment_id', data['comment']['comment_id']), ('commit_comment_text', data['comment']['comment_text']), ('commit_comment_type', data['comment']['comment_type']), ('commit_comment_f_path', data['comment']['comment_f_path']), ('commit_comment_line_no', data['comment']['comment_line_no']), ('commit_comment_commit_id', data['commit']['commit_id']), ('commit_comment_commit_branch', data['commit']['commit_branch']), ('commit_comment_commit_message', data['commit']['commit_message']), ] for k, v in comment_vars: url = UrlTmpl(url).safe_substitute(**{k: v}) return [(url, self.headers, data)] def repo_create_event_handler(self, event, data): url = self.get_base_parsed_template(data) log.debug('register %s call(%s) to url %s', self.name, event, url) return [(url, self.headers, data)] def pull_request_event_handler(self, event, data): url = self.get_base_parsed_template(data) log.debug('register %s call(%s) to url %s', self.name, event, url) pr_vars = [ ('pull_request_id', data['pullrequest']['pull_request_id']), ('pull_request_title', data['pullrequest']['title']), ('pull_request_url', data['pullrequest']['url']), ('pull_request_shadow_url', data['pullrequest']['shadow_url']), ('pull_request_commits_uid', data['pullrequest']['commits_uid']), ] for k, v in pr_vars: url = UrlTmpl(url).safe_substitute(**{k: v}) return [(url, self.headers, data)] def __call__(self, event, data): from rhodecode import events if isinstance(event, events.RepoPushEvent): return self.repo_push_event_handler(event, data) elif isinstance(event, events.RepoCreateEvent): return self.repo_create_event_handler(event, data) elif isinstance(event, events.RepoCommitCommentEvent): return self.repo_commit_comment_handler(event, data) elif isinstance(event, events.RepoCommitCommentEditEvent): return self.repo_commit_comment_edit_handler(event, data) elif isinstance(event, events.PullRequestEvent): return self.pull_request_event_handler(event, data) else: raise ValueError( f'event type `{event.__class__}` has no handler defined') def get_auth(settings): from requests.auth import HTTPBasicAuth username = settings.get('username') password = settings.get('password') if username and password: return HTTPBasicAuth(username, password) return None def get_web_token(settings): return settings['secret_token'] def get_url_vars(url_vars): items = [] for section, section_items in url_vars: items.append(f'\n*{section}*') for key, explanation in section_items: items.append(' {} - {}'.format('${' + key + '}', explanation)) return '\n'.join(items) def render_with_traceback(template, *args, **kwargs): try: return template.render(*args, **kwargs) except Exception: log.error(exceptions.text_error_template().render()) raise STATUS_400 = (400, 401, 403) STATUS_500 = (500, 502, 504) def requests_retry_call( retries=3, backoff_factor=0.3, status_forcelist=STATUS_400+STATUS_500, session=None): """ session = requests_retry_session() response = session.get('http://example.com') :param retries: :param backoff_factor: :param status_forcelist: :param session: """ session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session