# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Base for test suite for making push/pull operations. .. important:: You must have git >= 1.8.5 for tests to work fine. With 68b939b git started to redirect things to stderr instead of stdout. """ import logging import os import tempfile import subprocess from rhodecode.lib.str_utils import safe_str from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO DEBUG = True RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log') REPO_GROUP = 'a_repo_group' HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}' GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}' SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}' log = logging.getLogger(__name__) class Command(object): def __init__(self, cwd): self.cwd = cwd self.process = None def execute(self, cmd, *args): """ Runs command on the system with given ``args``. """ command = cmd + ' ' + ' '.join(args) if DEBUG: log.debug('*** CMD %s ***', command) env = dict(os.environ) # Delete coverage variables, as they make the test fail for Mercurial for key in env.keys(): if key.startswith('COV_CORE_'): del env[key] self.process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.cwd, env=env) stdout, stderr = self.process.communicate() stdout = safe_str(stdout) stderr = safe_str(stderr) if DEBUG: log.debug('STDOUT:%s', stdout) log.debug('STDERR:%s', stderr) return stdout, stderr def assert_returncode_success(self): assert self.process.returncode == 0 def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs): full_name = 'Marcin Kuźminski' email = 'me@email.com' git_ident = f"git config user.name {full_name} && git config user.email {email}" cwd = path = os.path.join(dest) tags = tags or [] name_sequence = next(tempfile._RandomNameSequence()) added_file = os.path.join(path, f'{name_sequence}_setup.py') Command(cwd).execute(f'touch {added_file}') Command(cwd).execute(f'{vcs} add {added_file}') author_str = 'Marcin Kuźminski ' for i in range(kwargs.get('files_no', 3)): cmd = f"""echo 'added_line{i}' >> {added_file}""" Command(cwd).execute(cmd) if vcs == 'hg': cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """ elif vcs == 'git': cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}""" Command(cwd).execute(cmd) for tag in tags: if vcs == 'hg': Command(cwd).execute( f"""hg tag -m "{tag['commit']}" -u "{author_str}" """, tag['name']) elif vcs == 'git': if tag['commit']: # annotated tag _stdout, _stderr = Command(cwd).execute( f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """ ) else: # lightweight tag _stdout, _stderr = Command(cwd).execute( f"""{git_ident} && git tag {tag['name']}""" ) def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs): """ Generate some files, add it to DEST repo and push back vcs is git or hg and defines what VCS we want to make those files for """ git_ident = "git config user.name Marcin Kuźminski && git config user.email me@email.com" cwd = os.path.join(dest) # commit some stuff into this repo _add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs) default_target_branch = { 'git': 'master', 'hg': 'default' }.get(vcs) target_branch = target_branch or default_target_branch # PUSH it back stdout = stderr = None if vcs == 'hg': maybe_new_branch = '' if new_branch: maybe_new_branch = '--new-branch' stdout, stderr = Command(cwd).execute( f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}' ) elif vcs == 'git': stdout, stderr = Command(cwd).execute( f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}' ) elif vcs == 'svn': username = kwargs.pop('username', '') password = kwargs.pop('password', '') auth = '' if username and password: auth = f'--username {username} --password {password}' stdout, stderr = Command(cwd).execute( f'svn commit --no-auth-cache --non-interactive {auth} -m "pushing to {target_branch}"' ) return stdout, stderr def _check_proper_git_push( stdout, stderr, branch='master', should_set_default_branch=False): # Note: Git is writing most information to stderr intentionally assert 'fatal' not in stderr assert 'rejected' not in stderr assert 'Pushing to' in stderr assert '%s -> %s' % (branch, branch) in stderr if should_set_default_branch: assert "Setting default branch to %s" % branch in stderr else: assert "Setting default branch" not in stderr def _check_proper_hg_push(stdout, stderr, branch='default'): assert 'pushing to' in stdout assert 'searching for changes' in stdout assert 'abort:' not in stderr def _check_proper_svn_push(stdout, stderr): assert 'pushing to' in stdout assert 'searching for changes' in stdout assert 'abort:' not in stderr def _check_proper_clone(stdout, stderr, vcs): if vcs == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' if vcs == 'git': assert '' == stdout assert 'Cloning into' in stderr assert 'abort:' not in stderr assert 'fatal:' not in stderr if vcs == 'svn': assert 'dupa' in stdout