__init__.py
193 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
r2456 | # -*- coding: utf-8 -*- | |||
r4306 | # Copyright (C) 2010-2020 RhodeCode GmbH | |||
r2456 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Base for test suite for making push/pull operations. | ||||
.. important:: | ||||
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started | ||||
to redirect things to stderr instead of stdout. | ||||
""" | ||||
from os.path import join as jn | ||||
r4926 | from subprocess import Popen, PIPE | |||
r2456 | import logging | |||
import os | ||||
import tempfile | ||||
from rhodecode.tests import GIT_REPO, HG_REPO | ||||
DEBUG = True | ||||
RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log') | ||||
REPO_GROUP = 'a_repo_group' | ||||
HG_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, HG_REPO) | ||||
GIT_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, GIT_REPO) | ||||
log = logging.getLogger(__name__) | ||||
class Command(object): | ||||
def __init__(self, cwd): | ||||
self.cwd = cwd | ||||
self.process = None | ||||
def execute(self, cmd, *args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
command = cmd + ' ' + ' '.join(args) | ||||
if DEBUG: | ||||
r3061 | log.debug('*** CMD %s ***', command) | |||
r2456 | ||||
env = dict(os.environ) | ||||
# Delete coverage variables, as they make the test fail for Mercurial | ||||
for key in env.keys(): | ||||
if key.startswith('COV_CORE_'): | ||||
del env[key] | ||||
self.process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, | ||||
cwd=self.cwd, env=env) | ||||
stdout, stderr = self.process.communicate() | ||||
if DEBUG: | ||||
r3061 | log.debug('STDOUT:%s', stdout) | |||
log.debug('STDERR:%s', stderr) | ||||
r2456 | return stdout, stderr | |||
def assert_returncode_success(self): | ||||
assert self.process.returncode == 0 | ||||
r4879 | def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs): | |||
r2979 | git_ident = "git config user.name {} && git config user.email {}".format( | |||
'Marcin Kuźminski', 'me@email.com') | ||||
cwd = path = jn(dest) | ||||
r2456 | tags = tags or [] | |||
r4879 | added_file = jn(path, '%s_setup.py' % tempfile._RandomNameSequence().next()) | |||
r2456 | Command(cwd).execute('touch %s' % added_file) | |||
Command(cwd).execute('%s add %s' % (vcs, added_file)) | ||||
author_str = 'Marcin Kuźminski <me@email.com>' | ||||
for i in range(kwargs.get('files_no', 3)): | ||||
cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | ||||
Command(cwd).execute(cmd) | ||||
r4879 | ||||
r2456 | if vcs == 'hg': | |||
r4879 | cmd = """hg commit -m 'committed new %s' -u '%s' %s """ % ( | |||
r2456 | i, author_str, added_file | |||
) | ||||
elif vcs == 'git': | ||||
r4879 | cmd = """%s && git commit -m 'committed new %s' %s""" % ( | |||
r2456 | git_ident, i, added_file) | |||
Command(cwd).execute(cmd) | ||||
for tag in tags: | ||||
if vcs == 'hg': | ||||
r2979 | Command(cwd).execute( | |||
r4879 | 'hg tag -m "{}" -u "{}" '.format(tag['commit'], author_str), tag['name']) | |||
r2456 | elif vcs == 'git': | |||
if tag['commit']: | ||||
# annotated tag | ||||
r2979 | _stdout, _stderr = Command(cwd).execute( | |||
r2456 | """%s && git tag -a %s -m "%s" """ % ( | |||
git_ident, tag['name'], tag['commit'])) | ||||
else: | ||||
# lightweight tag | ||||
r2979 | _stdout, _stderr = Command(cwd).execute( | |||
r2456 | """%s && git tag %s""" % ( | |||
git_ident, tag['name'])) | ||||
r2979 | ||||
def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None, | ||||
new_branch=False, **kwargs): | ||||
""" | ||||
Generate some files, add it to DEST repo and push back | ||||
vcs is git or hg and defines what VCS we want to make those files for | ||||
""" | ||||
git_ident = "git config user.name {} && git config user.email {}".format( | ||||
'Marcin Kuźminski', 'me@email.com') | ||||
cwd = path = jn(dest) | ||||
# commit some stuff into this repo | ||||
_add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs) | ||||
default_target_branch = { | ||||
'git': 'master', | ||||
'hg': 'default' | ||||
}.get(vcs) | ||||
target_branch = target_branch or default_target_branch | ||||
r2456 | # PUSH it back | |||
stdout = stderr = None | ||||
if vcs == 'hg': | ||||
r2979 | maybe_new_branch = '' | |||
if new_branch: | ||||
maybe_new_branch = '--new-branch' | ||||
r2456 | stdout, stderr = Command(cwd).execute( | |||
r2979 | 'hg push --verbose {} -r {} {}'.format(maybe_new_branch, target_branch, clone_url) | |||
) | ||||
r2456 | elif vcs == 'git': | |||
stdout, stderr = Command(cwd).execute( | ||||
r2979 | """{} && | |||
git push --verbose --tags {} {}""".format(git_ident, clone_url, target_branch) | ||||
) | ||||
r2456 | ||||
return stdout, stderr | ||||
def _check_proper_git_push( | ||||
stdout, stderr, branch='master', should_set_default_branch=False): | ||||
# Note: Git is writing most information to stderr intentionally | ||||
assert 'fatal' not in stderr | ||||
assert 'rejected' not in stderr | ||||
assert 'Pushing to' in stderr | ||||
assert '%s -> %s' % (branch, branch) in stderr | ||||
if should_set_default_branch: | ||||
assert "Setting default branch to %s" % branch in stderr | ||||
else: | ||||
assert "Setting default branch" not in stderr | ||||
def _check_proper_hg_push(stdout, stderr, branch='default'): | ||||
assert 'pushing to' in stdout | ||||
assert 'searching for changes' in stdout | ||||
assert 'abort:' not in stderr | ||||
def _check_proper_clone(stdout, stderr, vcs): | ||||
if vcs == 'hg': | ||||
assert 'requesting all changes' in stdout | ||||
assert 'adding changesets' in stdout | ||||
assert 'adding manifests' in stdout | ||||
assert 'adding file changes' in stdout | ||||
assert stderr == '' | ||||
if vcs == 'git': | ||||
assert '' == stdout | ||||
assert 'Cloning into' in stderr | ||||
assert 'abort:' not in stderr | ||||
assert 'fatal:' not in stderr | ||||