|
|
# Copyright (C) 2010-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Base for test suite for making push/pull operations.
|
|
|
|
|
|
.. important::
|
|
|
|
|
|
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
|
|
|
to redirect things to stderr instead of stdout.
|
|
|
"""
|
|
|
|
|
|
|
|
|
import logging
|
|
|
import os
|
|
|
import tempfile
|
|
|
import subprocess
|
|
|
|
|
|
from rhodecode.lib.str_utils import safe_str
|
|
|
from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
|
|
|
|
|
|
DEBUG = True
|
|
|
REPO_GROUP = 'a_repo_group'
|
|
|
HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
|
|
|
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
|
|
|
SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class Command(object):
|
|
|
|
|
|
def __init__(self, cwd):
|
|
|
self.cwd = cwd
|
|
|
self.process = None
|
|
|
|
|
|
def execute(self, cmd, *args):
|
|
|
"""
|
|
|
Runs command on the system with given ``args``.
|
|
|
"""
|
|
|
|
|
|
command = cmd + ' ' + ' '.join(args)
|
|
|
if DEBUG:
|
|
|
log.debug('*** CMD %s ***', command)
|
|
|
|
|
|
env = dict(os.environ)
|
|
|
# Delete coverage variables, as they make the test fail for Mercurial
|
|
|
for key in env.keys():
|
|
|
if key.startswith('COV_CORE_'):
|
|
|
del env[key]
|
|
|
|
|
|
self.process = subprocess.Popen(
|
|
|
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
|
cwd=self.cwd, env=env)
|
|
|
stdout, stderr = self.process.communicate()
|
|
|
|
|
|
stdout = safe_str(stdout)
|
|
|
stderr = safe_str(stderr)
|
|
|
|
|
|
if DEBUG:
|
|
|
log.debug('STDOUT:%s', stdout)
|
|
|
log.debug('STDERR:%s', stderr)
|
|
|
return stdout, stderr
|
|
|
|
|
|
def assert_returncode_success(self):
|
|
|
assert self.process.returncode == 0
|
|
|
|
|
|
|
|
|
def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
|
|
|
full_name = 'Marcin Kuźminski'
|
|
|
email = 'me@email.com'
|
|
|
git_ident = f"git config user.name {full_name} && git config user.email {email}"
|
|
|
cwd = path = os.path.join(dest)
|
|
|
|
|
|
tags = tags or []
|
|
|
name_sequence = next(tempfile._RandomNameSequence())
|
|
|
added_file = os.path.join(path, f'{name_sequence}_setup.py')
|
|
|
|
|
|
Command(cwd).execute(f'touch {added_file}')
|
|
|
Command(cwd).execute(f'{vcs} add {added_file}')
|
|
|
author_str = 'Marcin Kuźminski <me@email.com>'
|
|
|
|
|
|
for i in range(kwargs.get('files_no', 3)):
|
|
|
cmd = f"""echo 'added_line{i}' >> {added_file}"""
|
|
|
Command(cwd).execute(cmd)
|
|
|
|
|
|
if vcs == 'hg':
|
|
|
cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """
|
|
|
elif vcs == 'git':
|
|
|
cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}"""
|
|
|
Command(cwd).execute(cmd)
|
|
|
|
|
|
for tag in tags:
|
|
|
if vcs == 'hg':
|
|
|
Command(cwd).execute(
|
|
|
f"""hg tag -m "{tag['commit']}" -u "{author_str}" """,
|
|
|
tag['name'])
|
|
|
elif vcs == 'git':
|
|
|
if tag['commit']:
|
|
|
# annotated tag
|
|
|
_stdout, _stderr = Command(cwd).execute(
|
|
|
f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """
|
|
|
)
|
|
|
else:
|
|
|
# lightweight tag
|
|
|
_stdout, _stderr = Command(cwd).execute(
|
|
|
f"""{git_ident} && git tag {tag['name']}"""
|
|
|
)
|
|
|
|
|
|
|
|
|
def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None,
|
|
|
new_branch=False, **kwargs):
|
|
|
"""
|
|
|
Generate some files, add it to DEST repo and push back
|
|
|
vcs is git or hg and defines what VCS we want to make those files for
|
|
|
"""
|
|
|
git_ident = "git config user.name Marcin Kuźminski && git config user.email me@email.com"
|
|
|
cwd = os.path.join(dest)
|
|
|
|
|
|
# commit some stuff into this repo
|
|
|
_add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs)
|
|
|
|
|
|
default_target_branch = {
|
|
|
'git': 'master',
|
|
|
'hg': 'default'
|
|
|
}.get(vcs)
|
|
|
|
|
|
target_branch = target_branch or default_target_branch
|
|
|
|
|
|
# PUSH it back
|
|
|
stdout = stderr = None
|
|
|
if vcs == 'hg':
|
|
|
maybe_new_branch = ''
|
|
|
if new_branch:
|
|
|
maybe_new_branch = '--new-branch'
|
|
|
stdout, stderr = Command(cwd).execute(
|
|
|
f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}'
|
|
|
)
|
|
|
elif vcs == 'git':
|
|
|
stdout, stderr = Command(cwd).execute(
|
|
|
f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}'
|
|
|
)
|
|
|
elif vcs == 'svn':
|
|
|
username = kwargs.pop('username', '')
|
|
|
password = kwargs.pop('password', '')
|
|
|
auth = ''
|
|
|
if username and password:
|
|
|
auth = f'--username {username} --password {password}'
|
|
|
|
|
|
stdout, stderr = Command(cwd).execute(
|
|
|
f'svn commit --no-auth-cache --non-interactive {auth} -m "pushing to {target_branch}"'
|
|
|
)
|
|
|
|
|
|
return stdout, stderr
|
|
|
|
|
|
|
|
|
def _check_proper_git_push(
|
|
|
stdout, stderr, branch='master', should_set_default_branch=False):
|
|
|
# Note: Git is writing most information to stderr intentionally
|
|
|
assert 'fatal' not in stderr
|
|
|
assert 'rejected' not in stderr
|
|
|
assert 'Pushing to' in stderr
|
|
|
assert '%s -> %s' % (branch, branch) in stderr
|
|
|
|
|
|
if should_set_default_branch:
|
|
|
assert "Setting default branch to %s" % branch in stderr
|
|
|
else:
|
|
|
assert "Setting default branch" not in stderr
|
|
|
|
|
|
|
|
|
def _check_proper_hg_push(stdout, stderr, branch='default'):
|
|
|
assert 'pushing to' in stdout
|
|
|
assert 'searching for changes' in stdout
|
|
|
|
|
|
assert 'abort:' not in stderr
|
|
|
|
|
|
|
|
|
def _check_proper_svn_push(stdout, stderr):
|
|
|
assert 'pushing to' in stdout
|
|
|
assert 'searching for changes' in stdout
|
|
|
|
|
|
assert 'abort:' not in stderr
|
|
|
|
|
|
|
|
|
def _check_proper_clone(stdout, stderr, vcs):
|
|
|
if vcs == 'hg':
|
|
|
assert 'requesting all changes' in stdout
|
|
|
assert 'adding changesets' in stdout
|
|
|
assert 'adding manifests' in stdout
|
|
|
assert 'adding file changes' in stdout
|
|
|
|
|
|
assert stderr == ''
|
|
|
|
|
|
if vcs == 'git':
|
|
|
assert '' == stdout
|
|
|
assert 'Cloning into' in stderr
|
|
|
assert 'abort:' not in stderr
|
|
|
assert 'fatal:' not in stderr
|
|
|
|
|
|
if vcs == 'svn':
|
|
|
assert 'dupa' in stdout
|
|
|
|
|
|
|
|
|
|