##// END OF EJS Templates
setup: avoid setuptools 67 - it can't handle celery's broken pytz dependency...
setup: avoid setuptools 67 - it can't handle celery's broken pytz dependency With setuptools 67 or later, launching Kallithea fails as: $ gearbox serve -c my.ini --reload 15:56:54,111 ERROR [gearbox] Expected closing RIGHT_PARENTHESIS pytz (>dev) ~^ The `packaging` vendored in setuptools cannot handle the broken syntax `Requires-Dist: pytz (>dev)` in venv/lib/python3.11/site-packages/celery-5.0.5.dist-info/METADATA . The old celery version currently used by Kallithea is wrong, and setuptools has moved on after a reasonable grace period. We thus have to work around and avoid latest setuptools. See https://github.com/pypa/setuptools/issues/3889 .

File last commit:

r8762:0e7dab99 default
r8767:0a9ddb8c stable
Show More
test_vcs_operations.py
641 lines | 28.9 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Test suite for vcs push/pull operations.
The tests need Git > 1.8.1.
This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Dec 30, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""
import json
import os
import re
import tempfile
import time
import urllib.request
from subprocess import PIPE, Popen
from tempfile import _RandomNameSequence
import pytest
import kallithea
from kallithea.lib.utils2 import ascii_bytes, safe_str
from kallithea.model import db, meta
from kallithea.model.ssh_key import SshKeyModel
from kallithea.model.user import UserModel
from kallithea.tests import base
from kallithea.tests.fixture import Fixture
DEBUG = True
HOST = '127.0.0.1:4999' # test host
fixture = Fixture()
# Parameterize different kinds of VCS testing - both the kind of VCS and the
# access method (HTTP/SSH)
# Mixin for using HTTP and SSH URLs
class HttpVcsTest(object):
@staticmethod
def repo_url_param(webserver, repo_name, **kwargs):
return webserver.repo_url(repo_name, **kwargs)
class SshVcsTest(object):
public_keys = {
base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost',
base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost',
}
@classmethod
def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR):
user = db.User.get_by_username(username)
if user.ssh_keys:
ssh_key = user.ssh_keys[0]
else:
sshkeymodel = SshKeyModel()
ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username])
meta.Session().commit()
return cls._ssh_param(repo_name, user, ssh_key, client_ip)
# Mixins for using Mercurial and Git
class HgVcsTest(object):
repo_type = 'hg'
repo_name = base.HG_REPO
class GitVcsTest(object):
repo_type = 'git'
repo_name = base.GIT_REPO
# Combine mixins to give the combinations we want to parameterize tests with
class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
pass
class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
pass
class HgSshVcsTest(HgVcsTest, SshVcsTest):
@staticmethod
def _ssh_param(repo_name, user, ssh_key, client_ip):
# Specify a custom ssh command on the command line
return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % (
client_ip,
kallithea.CONFIG['__file__'],
user.user_id,
ssh_key.user_ssh_key_id,
repo_name)
class GitSshVcsTest(GitVcsTest, SshVcsTest):
@staticmethod
def _ssh_param(repo_name, user, ssh_key, client_ip):
# Set a custom ssh command in the global environment
os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % (
client_ip,
kallithea.CONFIG['__file__'],
user.user_id,
ssh_key.user_ssh_key_id)
return "ssh://someuser@somehost/%s""" % repo_name
parametrize_vcs_test = base.parametrize('vt', [
HgHttpVcsTest,
GitHttpVcsTest,
HgSshVcsTest,
GitSshVcsTest,
])
parametrize_vcs_test_hg = base.parametrize('vt', [
HgHttpVcsTest,
HgSshVcsTest,
])
parametrize_vcs_test_http = base.parametrize('vt', [
HgHttpVcsTest,
GitHttpVcsTest,
])
class Command(object):
def __init__(self, cwd):
self.cwd = cwd
def execute(self, *args, **environ):
"""
Runs command on the system with given ``args`` using simple space
join without safe quoting.
"""
command = ' '.join(args)
ignoreReturnCode = environ.pop('ignoreReturnCode', False)
if DEBUG:
print('*** CMD %s ***' % command)
testenv = dict(os.environ)
testenv['LANG'] = 'en_US.UTF-8'
testenv['LANGUAGE'] = 'en_US:en'
testenv['HGPLAIN'] = ''
testenv['HGRCPATH'] = ''
testenv.update(environ)
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
stdout, stderr = p.communicate()
if DEBUG:
if stdout:
print('stdout:', stdout)
if stderr:
print('stderr:', stderr)
if not ignoreReturnCode:
assert p.returncode == 0
return safe_str(stdout), safe_str(stderr)
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
def _add_files(vcs, dest_dir, files_no=3):
"""
Generate some files, add it to dest_dir repo and push back
vcs is git or hg and defines what VCS we want to make those files for
:param vcs:
:param dest_dir:
"""
added_file = '%ssetup.py' % next(_RandomNameSequence())
open(os.path.join(dest_dir, added_file), 'a').close()
Command(dest_dir).execute(vcs, 'add', added_file)
email = 'me@example.com'
if os.name == 'nt':
author_str = 'User <%s>' % email
else:
author_str = 'User ǝɯɐᴎ <%s>' % email
for i in range(files_no):
cmd = """echo "added_line%s" >> %s""" % (i, added_file)
Command(dest_dir).execute(cmd)
if vcs == 'hg':
cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
i, author_str, added_file
)
elif vcs == 'git':
cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
i, author_str, added_file
)
# git commit needs EMAIL on some machines
Command(dest_dir).execute(cmd, EMAIL=email)
def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3):
_add_files(vt.repo_type, dest_dir, files_no=files_no)
# PUSH it back
stdout = stderr = None
if vt.repo_type == 'hg':
stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
elif vt.repo_type == 'git':
stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
return stdout, stderr
def _check_outgoing(vcs, cwd, clone_url):
if vcs == 'hg':
# hg removes the password from default URLs, so we have to provide it here via the clone_url
return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
elif vcs == 'git':
Command(cwd).execute('git remote update')
return Command(cwd).execute('git log origin/master..master')
def set_anonymous_access(enable=True):
user = db.User.get_default_user()
user.active = enable
meta.Session().commit()
if enable != db.User.get_default_user().active:
raise Exception('Cannot set anonymous access')
#==============================================================================
# TESTS
#==============================================================================
def _check_proper_git_push(stdout, stderr):
assert 'fatal' not in stderr
assert 'rejected' not in stderr
assert 'Pushing to' in stderr
assert 'master -> master' in stderr
@pytest.mark.usefixtures("test_context_fixture")
class TestVCSOperations(base.TestController):
@classmethod
def setup_class(cls):
# DISABLE ANONYMOUS ACCESS
set_anonymous_access(False)
@pytest.fixture()
def testhook_cleanup(self):
yield
# remove hook
for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
entry = db.Ui.get_by_key('hooks', '%s.testhook' % hook)
if entry:
meta.Session().delete(entry)
meta.Session().commit()
@pytest.fixture(scope="module")
def testfork(self):
# create fork so the repo stays untouched
git_fork_name = '%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence()))
fixture.create_fork(base.GIT_REPO, git_fork_name)
hg_fork_name = '%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence()))
fixture.create_fork(base.HG_REPO, hg_fork_name)
return {'git': git_fork_name, 'hg': hg_fork_name}
@parametrize_vcs_test
def test_clone_repo_by_admin(self, webserver, vt):
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
if vt.repo_type == 'git':
assert 'Cloning into' in stdout + stderr
assert stderr == '' or stdout == ''
elif vt.repo_type == 'hg':
assert 'requesting all changes' in stdout
assert 'adding changesets' in stdout
assert 'adding manifests' in stdout
assert 'adding file changes' in stdout
assert stderr == ''
@parametrize_vcs_test_http
def test_clone_wrong_credentials(self, webserver, vt):
clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!')
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
if vt.repo_type == 'git':
assert 'fatal: Authentication failed' in stderr
elif vt.repo_type == 'hg':
assert 'abort: authorization failed' in stderr
def test_clone_git_dir_as_hg(self, webserver):
clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr
def test_clone_hg_repo_as_git(self, webserver):
clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
assert 'not found' in stderr
@parametrize_vcs_test
def test_clone_non_existing_path(self, webserver, vt):
clone_url = vt.repo_url_param(webserver, 'trololo')
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
if vt.repo_type == 'git':
assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr
elif vt.repo_type == 'hg':
assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout + stderr
@parametrize_vcs_test
def test_push_new_repo(self, webserver, vt):
# Clear the log so we know what is added
db.UserLog.query().delete()
meta.Session().commit()
# Create an empty server repo using the API
repo_name = 'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence()))
usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
params = {
"id": 7,
"api_key": usr.api_key,
"method": 'create_repo',
"args": dict(repo_name=repo_name,
owner=base.TEST_USER_ADMIN_LOGIN,
repo_type=vt.repo_type),
}
req = urllib.request.Request(
'http://%s:%s/_admin/api' % webserver.server_address,
data=ascii_bytes(json.dumps(params)),
headers={'content-type': 'application/json'})
response = urllib.request.urlopen(req)
result = json.loads(response.read())
# Expect something like:
# {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
assert result['result']['success']
# Create local clone of the empty server repo
local_clone_dir = _get_tmp_dir()
clone_url = vt.repo_url_param(webserver, repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir)
# Make 3 commits and push to the empty server repo.
# The server repo doesn't have any other heads than the
# refs/heads/master we are pushing, but the `git log` in the push hook
# should still list the 3 commits.
stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url)
if vt.repo_type == 'git':
_check_proper_git_push(stdout, stderr)
elif vt.repo_type == 'hg':
assert 'pushing to ' in stdout
assert 'remote: added ' in stdout
# Verify that we got the right events in UserLog. Expect something like:
# <UserLog('id:new_git_XXX:started_following_repo')>
# <UserLog('id:new_git_XXX:user_created_repo')>
# <UserLog('id:new_git_XXX:pull')>
# <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([
('started_following_repo', 0),
('user_created_repo', 0),
('pull', 0),
('push', 3)]
if vt.repo_type == 'git' else [
('started_following_repo', 0),
('user_created_repo', 0),
# (u'pull', 0), # Mercurial outgoing hook is not called for empty clones
('push', 3)])
@parametrize_vcs_test
def test_push_new_file(self, webserver, testfork, vt):
db.UserLog.query().delete()
meta.Session().commit()
dest_dir = _get_tmp_dir()
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url)
if vt.repo_type == 'git':
_check_proper_git_push(stdout, stderr)
elif vt.repo_type == 'hg':
assert 'pushing to' in stdout
assert 'Repository size' in stdout
assert 'Last revision is now' in stdout
meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
[('pull', 0), ('push', 3)]
@parametrize_vcs_test
def test_pull(self, webserver, testfork, vt):
db.UserLog.query().delete()
meta.Session().commit()
dest_dir = _get_tmp_dir()
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir)
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url)
meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
if vt.repo_type == 'git':
assert 'FETCH_HEAD' in stderr
elif vt.repo_type == 'hg':
assert 'new changesets' in stdout
action_parts = [ul.action for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
assert action_parts == ['pull']
# Test handling of URLs with extra '/' around repo_name
stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True)
if issubclass(vt, HttpVcsTest):
if vt.repo_type == 'git':
# NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs
assert 'Already up to date.' in stdout
else:
assert vt.repo_type == 'hg'
assert "abort: HTTP Error 404: Not Found" in stderr
else:
assert issubclass(vt, SshVcsTest)
if vt.repo_type == 'git':
assert "abort: Access to './%s' denied" % vt.repo_name in stderr
else:
assert "abort: Access to './%s' denied" % vt.repo_name in stdout + stderr
stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True)
if vt.repo_type == 'git':
assert 'Already up to date.' in stdout
else:
assert vt.repo_type == 'hg'
assert "no changes found" in stdout
assert "denied" not in stderr
assert "denied" not in stdout
assert "404" not in stdout
@parametrize_vcs_test
def test_push_invalidates_cache(self, webserver, testfork, vt):
pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])]
dest_dir = _get_tmp_dir()
clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url)
if vt.repo_type == 'git':
_check_proper_git_push(stdout, stderr)
meta.Session.close() # expire session to make sure SA fetches new Repository instances after last_changeset has been updated by server side hook in another process
post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])]
assert pre_cached_tip != post_cached_tip
@parametrize_vcs_test_http
def test_push_wrong_credentials(self, webserver, vt):
dest_dir = _get_tmp_dir()
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name')
stdout, stderr = _add_files_and_push(webserver, vt, dest_dir,
clone_url=clone_url, ignoreReturnCode=True)
if vt.repo_type == 'git':
assert 'fatal: Authentication failed' in stderr
elif vt.repo_type == 'hg':
assert 'abort: authorization failed' in stderr
@parametrize_vcs_test
def test_push_with_readonly_credentials(self, webserver, vt):
db.UserLog.query().delete()
meta.Session().commit()
dest_dir = _get_tmp_dir()
clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url)
if vt.repo_type == 'git':
assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr
elif vt.repo_type == 'hg':
assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout
meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
[('pull', 0)]
@parametrize_vcs_test
def test_push_back_to_wrong_url(self, webserver, vt):
dest_dir = _get_tmp_dir()
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
stdout, stderr = _add_files_and_push(
webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % (
webserver.server_address[0], webserver.server_address[1]),
ignoreReturnCode=True)
if vt.repo_type == 'git':
assert 'not found' in stderr
elif vt.repo_type == 'hg':
assert 'HTTP Error 404: Not Found' in stderr
@parametrize_vcs_test
def test_ip_restriction(self, webserver, vt):
user_model = UserModel()
try:
# Add IP constraint that excludes the test context:
user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
meta.Session().commit()
# IP permissions are cached, need to wait for the cache in the server process to expire
time.sleep(1.5)
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
if vt.repo_type == 'git':
# The message apparently changed in Git 1.8.3, so match it loosely.
assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr
elif vt.repo_type == 'hg':
assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout + stderr
finally:
# release IP restrictions
for ip in db.UserIpMap.query():
db.UserIpMap.delete(ip.ip_id)
meta.Session().commit()
# IP permissions are cached, need to wait for the cache in the server process to expire
time.sleep(1.5)
clone_url = vt.repo_url_param(webserver, vt.repo_name)
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
if vt.repo_type == 'git':
assert 'Cloning into' in stdout + stderr
assert stderr == '' or stdout == ''
elif vt.repo_type == 'hg':
assert 'requesting all changes' in stdout
assert 'adding changesets' in stdout
assert 'adding manifests' in stdout
assert 'adding file changes' in stdout
assert stderr == ''
@parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt):
# set prechangegroup to failing hook (returns True)
db.Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
meta.Session().commit()
# clone repo
clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
dest_dir = _get_tmp_dir()
stdout, stderr = Command(base.TESTS_TMP_PATH) \
.execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True)
if vt.repo_type == 'hg':
assert 'preoutgoing.testhook hook failed' in stdout + stderr
elif vt.repo_type == 'git':
assert 'error: 406' in stderr
@parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt):
# set prechangegroup to failing hook (returns exit code 1)
db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
meta.Session().commit()
# clone repo
clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
dest_dir = _get_tmp_dir()
stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url,
ignoreReturnCode=True)
assert 'failing_test_hook failed' in stdout + stderr
assert 'Traceback' not in stdout + stderr
assert 'prechangegroup.testhook hook failed' in stdout + stderr
# there are still outgoing changesets
stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
assert stdout != ''
# set prechangegroup hook to exception throwing method
db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook')
meta.Session().commit()
# re-try to push
stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
if vt is HgHttpVcsTest:
# like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned
assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr
elif vt is HgSshVcsTest:
assert 'remote: Exception: exception_test_hook threw an exception' in stdout
else: assert False
# there are still outgoing changesets
stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
assert stdout != ''
# set prechangegroup hook to method that returns False
db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook')
meta.Session().commit()
# re-try to push
stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
assert 'passing_test_hook succeeded' in stdout + stderr
assert 'Traceback' not in stdout + stderr
assert 'prechangegroup.testhook hook failed' not in stdout + stderr
# no more outgoing changesets
stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
assert stdout == ''
assert stderr == ''
def test_add_submodule_git(self, webserver, testfork):
dest_dir = _get_tmp_dir()
clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git'])
# add submodule
stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir)
stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule')
stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"', EMAIL=base.TEST_USER_ADMIN_EMAIL)
stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master')
# check for testsubmodule link in files page
self.log_user()
response = self.app.get(base.url(controller='files', action='index',
repo_name=testfork['git'],
revision='tip',
f_path='/'))
# check _repo_files_url that will be used to reload as AJAX
response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git'])
response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url)
# check that following a submodule link actually works - and redirects
response = self.app.get(base.url(controller='files', action='index',
repo_name=testfork['git'],
revision='tip',
f_path='/testsubmodule'),
status=302)
assert response.location == clone_url