__init__.py
218 lines
| 6.9 KiB
| text/x-python
|
PythonLexer
r5607 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r2456 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Base for test suite for making push/pull operations. | ||||
.. important:: | ||||
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started | ||||
to redirect things to stderr instead of stdout. | ||||
""" | ||||
r5459 | ||||
r2456 | import logging | |||
import os | ||||
import tempfile | ||||
r5459 | import subprocess | |||
r2456 | ||||
r4994 | from rhodecode.lib.str_utils import safe_str | |||
r5459 | from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO | |||
r2456 | ||||
DEBUG = True | ||||
REPO_GROUP = 'a_repo_group' | ||||
r5459 | HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}' | |||
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}' | ||||
SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}' | ||||
r2456 | ||||
log = logging.getLogger(__name__) | ||||
class Command(object): | ||||
def __init__(self, cwd): | ||||
self.cwd = cwd | ||||
self.process = None | ||||
def execute(self, cmd, *args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
command = cmd + ' ' + ' '.join(args) | ||||
if DEBUG: | ||||
r3061 | log.debug('*** CMD %s ***', command) | |||
r2456 | ||||
env = dict(os.environ) | ||||
# Delete coverage variables, as they make the test fail for Mercurial | ||||
for key in env.keys(): | ||||
if key.startswith('COV_CORE_'): | ||||
del env[key] | ||||
r5459 | self.process = subprocess.Popen( | |||
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | ||||
cwd=self.cwd, env=env) | ||||
r2456 | stdout, stderr = self.process.communicate() | |||
r4994 | ||||
stdout = safe_str(stdout) | ||||
stderr = safe_str(stderr) | ||||
r2456 | if DEBUG: | |||
r3061 | log.debug('STDOUT:%s', stdout) | |||
log.debug('STDERR:%s', stderr) | ||||
r2456 | return stdout, stderr | |||
def assert_returncode_success(self): | ||||
assert self.process.returncode == 0 | ||||
r4879 | def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs): | |||
r4994 | full_name = 'Marcin Kuźminski' | |||
email = 'me@email.com' | ||||
git_ident = f"git config user.name {full_name} && git config user.email {email}" | ||||
r5459 | cwd = path = os.path.join(dest) | |||
r2979 | ||||
r2456 | tags = tags or [] | |||
r5459 | name_sequence = next(tempfile._RandomNameSequence()) | |||
added_file = os.path.join(path, f'{name_sequence}_setup.py') | ||||
Command(cwd).execute(f'touch {added_file}') | ||||
Command(cwd).execute(f'{vcs} add {added_file}') | ||||
r2456 | author_str = 'Marcin Kuźminski <me@email.com>' | |||
for i in range(kwargs.get('files_no', 3)): | ||||
r5087 | cmd = f"""echo 'added_line{i}' >> {added_file}""" | |||
r2456 | Command(cwd).execute(cmd) | |||
r4879 | ||||
r2456 | if vcs == 'hg': | |||
r5087 | cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """ | |||
r2456 | elif vcs == 'git': | |||
r5087 | cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}""" | |||
r2456 | Command(cwd).execute(cmd) | |||
for tag in tags: | ||||
if vcs == 'hg': | ||||
r2979 | Command(cwd).execute( | |||
r5087 | f"""hg tag -m "{tag['commit']}" -u "{author_str}" """, | |||
tag['name']) | ||||
r2456 | elif vcs == 'git': | |||
if tag['commit']: | ||||
# annotated tag | ||||
r2979 | _stdout, _stderr = Command(cwd).execute( | |||
r5087 | f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """ | |||
) | ||||
r2456 | else: | |||
# lightweight tag | ||||
r2979 | _stdout, _stderr = Command(cwd).execute( | |||
r5087 | f"""{git_ident} && git tag {tag['name']}""" | |||
) | ||||
r2456 | ||||
r2979 | ||||
def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None, | ||||
new_branch=False, **kwargs): | ||||
""" | ||||
Generate some files, add it to DEST repo and push back | ||||
vcs is git or hg and defines what VCS we want to make those files for | ||||
""" | ||||
r5087 | git_ident = "git config user.name Marcin Kuźminski && git config user.email me@email.com" | |||
r5459 | cwd = os.path.join(dest) | |||
r2979 | ||||
# commit some stuff into this repo | ||||
_add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs) | ||||
default_target_branch = { | ||||
'git': 'master', | ||||
'hg': 'default' | ||||
}.get(vcs) | ||||
target_branch = target_branch or default_target_branch | ||||
r2456 | # PUSH it back | |||
stdout = stderr = None | ||||
if vcs == 'hg': | ||||
r2979 | maybe_new_branch = '' | |||
if new_branch: | ||||
maybe_new_branch = '--new-branch' | ||||
r2456 | stdout, stderr = Command(cwd).execute( | |||
r5459 | f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}' | |||
r2979 | ) | |||
r2456 | elif vcs == 'git': | |||
stdout, stderr = Command(cwd).execute( | ||||
r5459 | f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}' | |||
) | ||||
elif vcs == 'svn': | ||||
r5469 | username = kwargs.pop('username', '') | |||
password = kwargs.pop('password', '') | ||||
auth = '' | ||||
if username and password: | ||||
auth = f'--username {username} --password {password}' | ||||
r5459 | stdout, stderr = Command(cwd).execute( | |||
r5469 | f'svn commit --no-auth-cache --non-interactive {auth} -m "pushing to {target_branch}"' | |||
r2979 | ) | |||
r2456 | ||||
return stdout, stderr | ||||
def _check_proper_git_push( | ||||
stdout, stderr, branch='master', should_set_default_branch=False): | ||||
# Note: Git is writing most information to stderr intentionally | ||||
assert 'fatal' not in stderr | ||||
assert 'rejected' not in stderr | ||||
assert 'Pushing to' in stderr | ||||
assert '%s -> %s' % (branch, branch) in stderr | ||||
if should_set_default_branch: | ||||
assert "Setting default branch to %s" % branch in stderr | ||||
else: | ||||
assert "Setting default branch" not in stderr | ||||
def _check_proper_hg_push(stdout, stderr, branch='default'): | ||||
assert 'pushing to' in stdout | ||||
assert 'searching for changes' in stdout | ||||
assert 'abort:' not in stderr | ||||
r5459 | def _check_proper_svn_push(stdout, stderr): | |||
assert 'pushing to' in stdout | ||||
assert 'searching for changes' in stdout | ||||
assert 'abort:' not in stderr | ||||
r2456 | def _check_proper_clone(stdout, stderr, vcs): | |||
if vcs == 'hg': | ||||
assert 'requesting all changes' in stdout | ||||
assert 'adding changesets' in stdout | ||||
assert 'adding manifests' in stdout | ||||
assert 'adding file changes' in stdout | ||||
assert stderr == '' | ||||
if vcs == 'git': | ||||
assert '' == stdout | ||||
assert 'Cloning into' in stderr | ||||
assert 'abort:' not in stderr | ||||
assert 'fatal:' not in stderr | ||||
r5459 | ||||
if vcs == 'svn': | ||||
assert 'dupa' in stdout | ||||