# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import shutil import logging import textwrap import pytest import rhodecode import rhodecode.lib from rhodecode.tests import console_printer log = logging.getLogger(__name__) def store_rcextensions(destination, force=False): from rhodecode.config import rcextensions package_path = rcextensions.__path__[0] # Note: rcextensions are looked up based on the path of the ini file rcextensions_path = os.path.join(destination, 'rcextensions') if force: shutil.rmtree(rcextensions_path, ignore_errors=True) shutil.copytree(package_path, rcextensions_path) @pytest.fixture(scope="module") def rcextensions(request, tmp_storage_location): """ Installs a testing rcextensions pack to ensure they work as expected. """ # Note: rcextensions are looked up based on the path of the ini file rcextensions_path = os.path.join(tmp_storage_location, 'rcextensions') if os.path.exists(rcextensions_path): pytest.fail( f"Path for rcextensions already exists, please clean up before " f"test run this path: {rcextensions_path}") else: store_rcextensions(tmp_storage_location) @pytest.fixture(scope='function') def rcextensions_present(request): class RcExtensionsPresent: def __init__(self, rcextensions_location): self.rcextensions_location = rcextensions_location def __enter__(self): self.store() def __exit__(self, exc_type, exc_val, exc_tb): self.cleanup() def store(self): store_rcextensions(self.rcextensions_location) def cleanup(self): shutil.rmtree(os.path.join(self.rcextensions_location, 'rcextensions')) return RcExtensionsPresent @pytest.fixture(scope='function') def rcextensions_modification(request): """ example usage:: hook_name = '_pre_push_hook' code = ''' raise OSError('failed') return HookResponse(1, 'FAILED') ''' mods = [ (hook_name, code), ] # rhodecode.ini file location, where rcextensions needs to live rcstack_location = os.path.dirname(rcstack.config_file) with rcextensions_modification(rcstack_location, mods): # do some stuff """ class RcextensionsModification: def __init__(self, rcextensions_location, mods, create_if_missing=False, force_create=False): self.force_create = force_create self.create_if_missing = create_if_missing self.rcextensions_location = rcextensions_location self.mods = mods if not isinstance(mods, list): raise ValueError('mods must be a list of modifications') def __enter__(self): if self.create_if_missing: store_rcextensions(self.rcextensions_location, force=self.force_create) for hook_name, method_body in self.mods: self.modification(hook_name, method_body) def __exit__(self, exc_type, exc_val, exc_tb): self.cleanup() def cleanup(self): # reset rcextensions to "bare" state from the package store_rcextensions(self.rcextensions_location, force=True) def modification(self, hook_name, method_body): import ast rcextensions_path = os.path.join(self.rcextensions_location, 'rcextensions') # Load the code from hooks.py hooks_filename = os.path.join(rcextensions_path, 'hooks.py') with open(hooks_filename, "r") as file: tree = ast.parse(file.read()) # Define new content for the function as a string new_code = textwrap.dedent(method_body) # Parse the new code to add it to the function new_body = ast.parse(new_code).body # Walk through the AST to find and modify the function for node in tree.body: if isinstance(node, ast.FunctionDef) and node.name == hook_name: node.body = new_body # Replace the function body with the new body # Compile the modified AST back to code compile(tree, hooks_filename, "exec") # Write the updated code back to hooks.py with open(hooks_filename, "w") as file: file.write(ast.unparse(tree)) # Requires Python 3.9+ console_printer(f" [green]rcextensions[/green] Updated the body of '{hooks_filename}' function '{hook_name}'") return RcextensionsModification