##// END OF EJS Templates
deps: bumped s3fs==2024.10.0 and fsspec==2024.10.0
deps: bumped s3fs==2024.10.0 and fsspec==2024.10.0

File last commit:

r5607:39b20522 default
r5615:54cd00bd default
Show More
__init__.py
218 lines | 6.9 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Base for test suite for making push/pull operations.
.. important::
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
to redirect things to stderr instead of stdout.
"""
import logging
import os
import tempfile
import subprocess
from rhodecode.lib.str_utils import safe_str
from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
DEBUG = True
REPO_GROUP = 'a_repo_group'
HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
log = logging.getLogger(__name__)
class Command(object):
def __init__(self, cwd):
self.cwd = cwd
self.process = None
def execute(self, cmd, *args):
"""
Runs command on the system with given ``args``.
"""
command = cmd + ' ' + ' '.join(args)
if DEBUG:
log.debug('*** CMD %s ***', command)
env = dict(os.environ)
# Delete coverage variables, as they make the test fail for Mercurial
for key in env.keys():
if key.startswith('COV_CORE_'):
del env[key]
self.process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=self.cwd, env=env)
stdout, stderr = self.process.communicate()
stdout = safe_str(stdout)
stderr = safe_str(stderr)
if DEBUG:
log.debug('STDOUT:%s', stdout)
log.debug('STDERR:%s', stderr)
return stdout, stderr
def assert_returncode_success(self):
assert self.process.returncode == 0
def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
full_name = 'Marcin Kuźminski'
email = 'me@email.com'
git_ident = f"git config user.name {full_name} && git config user.email {email}"
cwd = path = os.path.join(dest)
tags = tags or []
name_sequence = next(tempfile._RandomNameSequence())
added_file = os.path.join(path, f'{name_sequence}_setup.py')
Command(cwd).execute(f'touch {added_file}')
Command(cwd).execute(f'{vcs} add {added_file}')
author_str = 'Marcin Kuźminski <me@email.com>'
for i in range(kwargs.get('files_no', 3)):
cmd = f"""echo 'added_line{i}' >> {added_file}"""
Command(cwd).execute(cmd)
if vcs == 'hg':
cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """
elif vcs == 'git':
cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}"""
Command(cwd).execute(cmd)
for tag in tags:
if vcs == 'hg':
Command(cwd).execute(
f"""hg tag -m "{tag['commit']}" -u "{author_str}" """,
tag['name'])
elif vcs == 'git':
if tag['commit']:
# annotated tag
_stdout, _stderr = Command(cwd).execute(
f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """
)
else:
# lightweight tag
_stdout, _stderr = Command(cwd).execute(
f"""{git_ident} && git tag {tag['name']}"""
)
def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None,
new_branch=False, **kwargs):
"""
Generate some files, add it to DEST repo and push back
vcs is git or hg and defines what VCS we want to make those files for
"""
git_ident = "git config user.name Marcin Kuźminski && git config user.email me@email.com"
cwd = os.path.join(dest)
# commit some stuff into this repo
_add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs)
default_target_branch = {
'git': 'master',
'hg': 'default'
}.get(vcs)
target_branch = target_branch or default_target_branch
# PUSH it back
stdout = stderr = None
if vcs == 'hg':
maybe_new_branch = ''
if new_branch:
maybe_new_branch = '--new-branch'
stdout, stderr = Command(cwd).execute(
f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}'
)
elif vcs == 'git':
stdout, stderr = Command(cwd).execute(
f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}'
)
elif vcs == 'svn':
username = kwargs.pop('username', '')
password = kwargs.pop('password', '')
auth = ''
if username and password:
auth = f'--username {username} --password {password}'
stdout, stderr = Command(cwd).execute(
f'svn commit --no-auth-cache --non-interactive {auth} -m "pushing to {target_branch}"'
)
return stdout, stderr
def _check_proper_git_push(
stdout, stderr, branch='master', should_set_default_branch=False):
# Note: Git is writing most information to stderr intentionally
assert 'fatal' not in stderr
assert 'rejected' not in stderr
assert 'Pushing to' in stderr
assert '%s -> %s' % (branch, branch) in stderr
if should_set_default_branch:
assert "Setting default branch to %s" % branch in stderr
else:
assert "Setting default branch" not in stderr
def _check_proper_hg_push(stdout, stderr, branch='default'):
assert 'pushing to' in stdout
assert 'searching for changes' in stdout
assert 'abort:' not in stderr
def _check_proper_svn_push(stdout, stderr):
assert 'pushing to' in stdout
assert 'searching for changes' in stdout
assert 'abort:' not in stderr
def _check_proper_clone(stdout, stderr, vcs):
if vcs == 'hg':
assert 'requesting all changes' in stdout
assert 'adding changesets' in stdout
assert 'adding manifests' in stdout
assert 'adding file changes' in stdout
assert stderr == ''
if vcs == 'git':
assert '' == stdout
assert 'Cloning into' in stderr
assert 'abort:' not in stderr
assert 'fatal:' not in stderr
if vcs == 'svn':
assert 'dupa' in stdout