|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright (C) 2010-2020 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Base for test suite for making push/pull operations.
|
|
|
|
|
|
.. important::
|
|
|
|
|
|
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
|
|
|
to redirect things to stderr instead of stdout.
|
|
|
"""
|
|
|
|
|
|
from os.path import join as jn
|
|
|
from subprocess import Popen, PIPE
|
|
|
import logging
|
|
|
import os
|
|
|
import tempfile
|
|
|
|
|
|
from rhodecode.lib.str_utils import safe_str
|
|
|
from rhodecode.tests import GIT_REPO, HG_REPO
|
|
|
|
|
|
DEBUG = True
|
|
|
RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log')
|
|
|
REPO_GROUP = 'a_repo_group'
|
|
|
HG_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, HG_REPO)
|
|
|
GIT_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, GIT_REPO)
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class Command(object):
|
|
|
|
|
|
def __init__(self, cwd):
|
|
|
self.cwd = cwd
|
|
|
self.process = None
|
|
|
|
|
|
def execute(self, cmd, *args):
|
|
|
"""
|
|
|
Runs command on the system with given ``args``.
|
|
|
"""
|
|
|
|
|
|
command = cmd + ' ' + ' '.join(args)
|
|
|
if DEBUG:
|
|
|
log.debug('*** CMD %s ***', command)
|
|
|
|
|
|
env = dict(os.environ)
|
|
|
# Delete coverage variables, as they make the test fail for Mercurial
|
|
|
for key in env.keys():
|
|
|
if key.startswith('COV_CORE_'):
|
|
|
del env[key]
|
|
|
|
|
|
self.process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE,
|
|
|
cwd=self.cwd, env=env)
|
|
|
stdout, stderr = self.process.communicate()
|
|
|
|
|
|
stdout = safe_str(stdout)
|
|
|
stderr = safe_str(stderr)
|
|
|
|
|
|
if DEBUG:
|
|
|
log.debug('STDOUT:%s', stdout)
|
|
|
log.debug('STDERR:%s', stderr)
|
|
|
return stdout, stderr
|
|
|
|
|
|
def assert_returncode_success(self):
|
|
|
assert self.process.returncode == 0
|
|
|
|
|
|
|
|
|
def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
|
|
|
full_name = 'Marcin Kuźminski'
|
|
|
email = 'me@email.com'
|
|
|
git_ident = f"git config user.name {full_name} && git config user.email {email}"
|
|
|
cwd = path = jn(dest)
|
|
|
|
|
|
tags = tags or []
|
|
|
added_file = jn(path, '{}_setup.py'.format(next(tempfile._RandomNameSequence())))
|
|
|
Command(cwd).execute('touch %s' % added_file)
|
|
|
Command(cwd).execute('%s add %s' % (vcs, added_file))
|
|
|
author_str = 'Marcin Kuźminski <me@email.com>'
|
|
|
|
|
|
for i in range(kwargs.get('files_no', 3)):
|
|
|
cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
|
|
|
Command(cwd).execute(cmd)
|
|
|
|
|
|
if vcs == 'hg':
|
|
|
cmd = """hg commit -m 'committed new %s' -u '%s' %s """ % (
|
|
|
i, author_str, added_file
|
|
|
)
|
|
|
elif vcs == 'git':
|
|
|
cmd = """%s && git commit -m 'committed new %s' %s""" % (
|
|
|
git_ident, i, added_file)
|
|
|
Command(cwd).execute(cmd)
|
|
|
|
|
|
for tag in tags:
|
|
|
if vcs == 'hg':
|
|
|
Command(cwd).execute(
|
|
|
'hg tag -m "{}" -u "{}" '.format(tag['commit'], author_str), tag['name'])
|
|
|
elif vcs == 'git':
|
|
|
if tag['commit']:
|
|
|
# annotated tag
|
|
|
_stdout, _stderr = Command(cwd).execute(
|
|
|
"""%s && git tag -a %s -m "%s" """ % (
|
|
|
git_ident, tag['name'], tag['commit']))
|
|
|
else:
|
|
|
# lightweight tag
|
|
|
_stdout, _stderr = Command(cwd).execute(
|
|
|
"""%s && git tag %s""" % (
|
|
|
git_ident, tag['name']))
|
|
|
|
|
|
|
|
|
def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None,
|
|
|
new_branch=False, **kwargs):
|
|
|
"""
|
|
|
Generate some files, add it to DEST repo and push back
|
|
|
vcs is git or hg and defines what VCS we want to make those files for
|
|
|
"""
|
|
|
git_ident = "git config user.name {} && git config user.email {}".format(
|
|
|
'Marcin Kuźminski', 'me@email.com')
|
|
|
cwd = path = jn(dest)
|
|
|
|
|
|
# commit some stuff into this repo
|
|
|
_add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs)
|
|
|
|
|
|
default_target_branch = {
|
|
|
'git': 'master',
|
|
|
'hg': 'default'
|
|
|
}.get(vcs)
|
|
|
|
|
|
target_branch = target_branch or default_target_branch
|
|
|
|
|
|
# PUSH it back
|
|
|
stdout = stderr = None
|
|
|
if vcs == 'hg':
|
|
|
maybe_new_branch = ''
|
|
|
if new_branch:
|
|
|
maybe_new_branch = '--new-branch'
|
|
|
stdout, stderr = Command(cwd).execute(
|
|
|
'hg push --verbose {} -r {} {}'.format(maybe_new_branch, target_branch, clone_url)
|
|
|
)
|
|
|
elif vcs == 'git':
|
|
|
stdout, stderr = Command(cwd).execute(
|
|
|
"""{} &&
|
|
|
git push --verbose --tags {} {}""".format(git_ident, clone_url, target_branch)
|
|
|
)
|
|
|
|
|
|
return stdout, stderr
|
|
|
|
|
|
|
|
|
def _check_proper_git_push(
|
|
|
stdout, stderr, branch='master', should_set_default_branch=False):
|
|
|
# Note: Git is writing most information to stderr intentionally
|
|
|
assert 'fatal' not in stderr
|
|
|
assert 'rejected' not in stderr
|
|
|
assert 'Pushing to' in stderr
|
|
|
assert '%s -> %s' % (branch, branch) in stderr
|
|
|
|
|
|
if should_set_default_branch:
|
|
|
assert "Setting default branch to %s" % branch in stderr
|
|
|
else:
|
|
|
assert "Setting default branch" not in stderr
|
|
|
|
|
|
|
|
|
def _check_proper_hg_push(stdout, stderr, branch='default'):
|
|
|
assert 'pushing to' in stdout
|
|
|
assert 'searching for changes' in stdout
|
|
|
|
|
|
assert 'abort:' not in stderr
|
|
|
|
|
|
|
|
|
def _check_proper_clone(stdout, stderr, vcs):
|
|
|
if vcs == 'hg':
|
|
|
assert 'requesting all changes' in stdout
|
|
|
assert 'adding changesets' in stdout
|
|
|
assert 'adding manifests' in stdout
|
|
|
assert 'adding file changes' in stdout
|
|
|
|
|
|
assert stderr == ''
|
|
|
|
|
|
if vcs == 'git':
|
|
|
assert '' == stdout
|
|
|
assert 'Cloning into' in stderr
|
|
|
assert 'abort:' not in stderr
|
|
|
assert 'fatal:' not in stderr
|
|
|
|