# Copyright (C) 2012-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import colander import string import collections import logging import requests import urllib.request import urllib.parse import urllib.error from requests.adapters import HTTPAdapter from requests.packages.urllib3.util import Retry from mako import exceptions from rhodecode.lib.str_utils import safe_str log = logging.getLogger(__name__) class UrlTmpl(string.Template): def safe_substitute(self, **kws): # url encode the kw for usage in url kws = {k: urllib.parse.quote(safe_str(v)) for k, v in kws.items()} return super().safe_substitute(**kws) class IntegrationTypeBase(object): """ Base class for IntegrationType plugins """ is_dummy = False description = '' @classmethod def icon(cls): return ''' image/svg+xml ''' def __init__(self, settings): """ :param settings: dict of settings to be used for the integration """ self.settings = settings def settings_schema(self): """ A colander schema of settings for the integration type """ return colander.Schema() def event_enabled(self, event): """ Checks if submitted event is enabled based on the plugin settings :param event: :return: bool """ allowed_events = self.settings.get('events') or [] if event.name not in allowed_events: log.debug('event ignored: %r event %s not in allowed set of events %s', event, event.name, allowed_events) return False return True class EEIntegration(IntegrationTypeBase): description = 'Integration available in RhodeCode EE edition.' is_dummy = True def __init__(self, name, key, settings=None): self.display_name = name self.key = key super(EEIntegration, self).__init__(settings) # Helpers # # updating this required to update the `common_vars` as well. WEBHOOK_URL_VARS = [ # GENERAL ('General', [ ('event_name', 'Unique name of the event type, e.g pullrequest-update'), ('repo_name', 'Full name of the repository'), ('repo_type', 'VCS type of repository'), ('repo_id', 'Unique id of repository'), ('repo_url', 'Repository url'), ] ), # extra repo fields ('Repository', [ ('extra:', 'Extra repo variables, read from its settings.'), ] ), # special attrs below that we handle, using multi-call ('Commit push - Multicalls', [ ('branch', 'Name of each branch submitted, if any.'), ('branch_head', 'Head ID of pushed branch (full sha of last commit), if any.'), ('commit_id', 'ID (full sha) of each commit submitted, if any.'), ] ), # pr events vars ('Pull request', [ ('pull_request_id', 'Unique ID of the pull request.'), ('pull_request_title', 'Title of the pull request.'), ('pull_request_url', 'Pull request url.'), ('pull_request_shadow_url', 'Pull request shadow repo clone url.'), ('pull_request_commits_uid', 'Calculated UID of all commits inside the PR. ' 'Changes after PR update'), ] ), # commit comment event vars ('Commit comment', [ ('commit_comment_id', 'Unique ID of the comment made on a commit.'), ('commit_comment_text', 'Text of commit comment.'), ('commit_comment_type', 'Type of comment, e.g note/todo.'), ('commit_comment_f_path', 'Optionally path of file for inline comments.'), ('commit_comment_line_no', 'Line number of the file: eg o10, or n200'), ('commit_comment_commit_id', 'Commit id that comment was left at.'), ('commit_comment_commit_branch', 'Commit branch that comment was left at'), ('commit_comment_commit_message', 'Commit message that comment was left at'), ] ), # user who triggers the call ('Caller', [ ('username', 'User who triggered the call.'), ('user_id', 'User id who triggered the call.'), ] ), ] # common vars for url template used for CI plugins. Shared with webhook CI_URL_VARS = WEBHOOK_URL_VARS class CommitParsingDataHandler(object): def aggregate_branch_data(self, branches, commits): branch_data = collections.OrderedDict() for obj in branches: branch_data[obj['name']] = obj branches_commits = collections.OrderedDict() for commit in commits: if commit.get('git_ref_change'): # special case for GIT that allows creating tags, # deleting branches without associated commit continue commit_branch = commit['branch'] if commit_branch not in branches_commits: _branch = branch_data[commit_branch] \ if commit_branch else commit_branch branch_commits = {'branch': _branch, 'branch_head': '', 'commits': []} branches_commits[commit_branch] = branch_commits branch_commits = branches_commits[commit_branch] branch_commits['commits'].append(commit) branch_commits['branch_head'] = commit['raw_id'] return branches_commits def get_auth(settings): from requests.auth import HTTPBasicAuth username = settings.get('username') password = settings.get('password') if username and password: return HTTPBasicAuth(username, password) return None def get_web_token(settings): return settings['secret_token'] def get_url_vars(url_vars): items = [] for section, section_items in url_vars: items.append(f'\n*{section}*') for key, explanation in section_items: items.append(' {} - {}'.format('${' + key + '}', explanation)) return '\n'.join(items) def render_with_traceback(template, *args, **kwargs): try: return template.render(*args, **kwargs) except Exception: log.error(exceptions.text_error_template().render()) raise STATUS_400 = (400, 401, 403) STATUS_500 = (500, 502, 504) def requests_retry_call( retries=3, backoff_factor=0.3, status_forcelist=STATUS_400+STATUS_500, session=None): """ session = requests_retry_session() response = session.get('http://example.com') :param retries: :param backoff_factor: :param status_forcelist: :param session: """ session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session