__init__.py
195 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
r2456 | ||||
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r2456 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Base for test suite for making push/pull operations. | ||||
.. important:: | ||||
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started | ||||
to redirect things to stderr instead of stdout. | ||||
""" | ||||
from os.path import join as jn | ||||
r4926 | from subprocess import Popen, PIPE | |||
r2456 | import logging | |||
import os | ||||
import tempfile | ||||
r4994 | from rhodecode.lib.str_utils import safe_str | |||
r2456 | from rhodecode.tests import GIT_REPO, HG_REPO | |||
DEBUG = True | ||||
RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log') | ||||
REPO_GROUP = 'a_repo_group' | ||||
HG_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, HG_REPO) | ||||
GIT_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, GIT_REPO) | ||||
log = logging.getLogger(__name__) | ||||
class Command(object): | ||||
def __init__(self, cwd): | ||||
self.cwd = cwd | ||||
self.process = None | ||||
def execute(self, cmd, *args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
command = cmd + ' ' + ' '.join(args) | ||||
if DEBUG: | ||||
r3061 | log.debug('*** CMD %s ***', command) | |||
r2456 | ||||
env = dict(os.environ) | ||||
# Delete coverage variables, as they make the test fail for Mercurial | ||||
for key in env.keys(): | ||||
if key.startswith('COV_CORE_'): | ||||
del env[key] | ||||
self.process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, | ||||
cwd=self.cwd, env=env) | ||||
stdout, stderr = self.process.communicate() | ||||
r4994 | ||||
stdout = safe_str(stdout) | ||||
stderr = safe_str(stderr) | ||||
r2456 | if DEBUG: | |||
r3061 | log.debug('STDOUT:%s', stdout) | |||
log.debug('STDERR:%s', stderr) | ||||
r2456 | return stdout, stderr | |||
def assert_returncode_success(self): | ||||
assert self.process.returncode == 0 | ||||
r4879 | def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs): | |||
r4994 | full_name = 'Marcin Kuźminski' | |||
email = 'me@email.com' | ||||
git_ident = f"git config user.name {full_name} && git config user.email {email}" | ||||
r2979 | cwd = path = jn(dest) | |||
r2456 | tags = tags or [] | |||
r4936 | added_file = jn(path, '{}_setup.py'.format(next(tempfile._RandomNameSequence()))) | |||
r2456 | Command(cwd).execute('touch %s' % added_file) | |||
Command(cwd).execute('%s add %s' % (vcs, added_file)) | ||||
author_str = 'Marcin Kuźminski <me@email.com>' | ||||
for i in range(kwargs.get('files_no', 3)): | ||||
r5087 | cmd = f"""echo 'added_line{i}' >> {added_file}""" | |||
r2456 | Command(cwd).execute(cmd) | |||
r4879 | ||||
r2456 | if vcs == 'hg': | |||
r5087 | cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """ | |||
r2456 | elif vcs == 'git': | |||
r5087 | cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}""" | |||
r2456 | Command(cwd).execute(cmd) | |||
for tag in tags: | ||||
if vcs == 'hg': | ||||
r2979 | Command(cwd).execute( | |||
r5087 | f"""hg tag -m "{tag['commit']}" -u "{author_str}" """, | |||
tag['name']) | ||||
r2456 | elif vcs == 'git': | |||
if tag['commit']: | ||||
# annotated tag | ||||
r2979 | _stdout, _stderr = Command(cwd).execute( | |||
r5087 | f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """ | |||
) | ||||
r2456 | else: | |||
# lightweight tag | ||||
r2979 | _stdout, _stderr = Command(cwd).execute( | |||
r5087 | f"""{git_ident} && git tag {tag['name']}""" | |||
) | ||||
r2456 | ||||
r2979 | ||||
def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None, | ||||
new_branch=False, **kwargs): | ||||
""" | ||||
Generate some files, add it to DEST repo and push back | ||||
vcs is git or hg and defines what VCS we want to make those files for | ||||
""" | ||||
r5087 | git_ident = "git config user.name Marcin Kuźminski && git config user.email me@email.com" | |||
cwd = jn(dest) | ||||
r2979 | ||||
# commit some stuff into this repo | ||||
_add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs) | ||||
default_target_branch = { | ||||
'git': 'master', | ||||
'hg': 'default' | ||||
}.get(vcs) | ||||
target_branch = target_branch or default_target_branch | ||||
r2456 | # PUSH it back | |||
stdout = stderr = None | ||||
if vcs == 'hg': | ||||
r2979 | maybe_new_branch = '' | |||
if new_branch: | ||||
maybe_new_branch = '--new-branch' | ||||
r2456 | stdout, stderr = Command(cwd).execute( | |||
r5087 | 'hg push --traceback --verbose {} -r {} {}'.format(maybe_new_branch, target_branch, clone_url) | |||
r2979 | ) | |||
r2456 | elif vcs == 'git': | |||
stdout, stderr = Command(cwd).execute( | ||||
r2979 | """{} && | |||
git push --verbose --tags {} {}""".format(git_ident, clone_url, target_branch) | ||||
) | ||||
r2456 | ||||
return stdout, stderr | ||||
def _check_proper_git_push( | ||||
stdout, stderr, branch='master', should_set_default_branch=False): | ||||
# Note: Git is writing most information to stderr intentionally | ||||
assert 'fatal' not in stderr | ||||
assert 'rejected' not in stderr | ||||
assert 'Pushing to' in stderr | ||||
assert '%s -> %s' % (branch, branch) in stderr | ||||
if should_set_default_branch: | ||||
assert "Setting default branch to %s" % branch in stderr | ||||
else: | ||||
assert "Setting default branch" not in stderr | ||||
def _check_proper_hg_push(stdout, stderr, branch='default'): | ||||
assert 'pushing to' in stdout | ||||
assert 'searching for changes' in stdout | ||||
assert 'abort:' not in stderr | ||||
def _check_proper_clone(stdout, stderr, vcs): | ||||
if vcs == 'hg': | ||||
assert 'requesting all changes' in stdout | ||||
assert 'adding changesets' in stdout | ||||
assert 'adding manifests' in stdout | ||||
assert 'adding file changes' in stdout | ||||
assert stderr == '' | ||||
if vcs == 'git': | ||||
assert '' == stdout | ||||
assert 'Cloning into' in stderr | ||||
assert 'abort:' not in stderr | ||||
assert 'fatal:' not in stderr | ||||