##// END OF EJS Templates
tests: add as little code as possible in __init__.py...
tests: add as little code as possible in __init__.py kallithea/tests/__init__.py contained quite a lot of code, including the test base class TestController. This in itself may be considered bad practice. Specifically, this poses a problem when using pytest 3.0+, in which asserts in some files are not automatically rewritten to give improved assert output. That problem can be fixed by explicitly registering such files for assertion rewriting, but that register call should be executed _before_ said files are imported. I.e. if the register call is in kallithea/tests/__init__.py, assert calls in __init__.py itself can not be rewritten. Since the TestController base class does effectively contain asserts, and we do not want to execute the register call from somewhere outside the kallithea/tests directory, we need to move the TestController class to another file (kallithea/tests/base.py) so we can have a register call in __init__.py before loading base.py. While not strictly necessary to fix the mentioned pytest problem, we take the opportunity to fully clean __init__.py and move everything to the new kallithea/tests/base.py. While doing so, unnecessary imports are removed, and imports are ordered alphabetically. Explicit imports of symbols from modules that were already imported as a whole, are removed in favor of fully qualifying the references (e.g. tempfile._RandomNameSequence).

File last commit:

r6180:8d98924c default
r6180:8d98924c default
Show More
manual_test_vcs_operations.py
569 lines | 21.5 KiB | text/x-python | PythonLexer
/ kallithea / tests / other / manual_test_vcs_operations.py
# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.tests.other.manual_test_vcs_operations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test suite for making push/pull operations.
Run it in two terminals::
paster serve kallithea/tests/test.ini
KALLITHEA_WHOOSH_TEST_DISABLE=1 KALLITHEA_NO_TMP_PATH=1 py.test kallithea/tests/other/manual_test_vcs_operations.py
You must have git > 1.8.1 for tests to work fine
This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Dec 30, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""
import os
import re
import tempfile
import time
from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE
from kallithea.tests.base import *
from kallithea.tests.fixture import Fixture
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
from kallithea.model.meta import Session
from kallithea.model.repo import RepoModel
from kallithea.model.user import UserModel
DEBUG = True
HOST = '127.0.0.1:4999' # test host
fixture = Fixture()
class Command(object):
def __init__(self, cwd):
self.cwd = cwd
def execute(self, cmd, *args, **environ):
"""
Runs command on the system with given ``args``.
"""
command = cmd + ' ' + ' '.join(args)
if DEBUG:
print '*** CMD %s ***' % command
testenv = dict(os.environ)
testenv['LANG'] = 'en_US.UTF-8'
testenv['LANGUAGE'] = 'en_US:en'
testenv.update(environ)
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
stdout, stderr = p.communicate()
if DEBUG:
if stdout:
print 'stdout:', repr(stdout)
if stderr:
print 'stderr:', repr(stderr)
return stdout, stderr
def _get_tmp_dir():
return tempfile.mkdtemp(prefix='rc_integration_test')
def _construct_url(repo, dest=None, **kwargs):
if dest is None:
#make temp clone
dest = _get_tmp_dir()
params = {
'user': TEST_USER_ADMIN_LOGIN,
'passwd': TEST_USER_ADMIN_PASS,
'host': HOST,
'cloned_repo': repo,
'dest': dest
}
params.update(**kwargs)
if params['user'] and params['passwd']:
_url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
else:
_url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
return _url
def _add_files_and_push(vcs, DEST, **kwargs):
"""
Generate some files, add it to DEST repo and push back
vcs is git or hg and defines what VCS we want to make those files for
:param vcs:
:param DEST:
"""
# commit some stuff into this repo
cwd = path = os.path.join(DEST)
#added_file = os.path.join(path, '%ssetupążźć.py' % _RandomNameSequence().next())
added_file = os.path.join(path, '%ssetup.py' % _RandomNameSequence().next())
Command(cwd).execute('touch %s' % added_file)
Command(cwd).execute('%s add %s' % (vcs, added_file))
email = 'me@example.com'
if os.name == 'nt':
author_str = 'User <%s>' % email
else:
author_str = 'User ǝɯɐᴎ <%s>' % email
for i in xrange(kwargs.get('files_no', 3)):
cmd = """echo "added_line%s" >> %s""" % (i, added_file)
Command(cwd).execute(cmd)
if vcs == 'hg':
cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
i, author_str, added_file
)
elif vcs == 'git':
cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
i, author_str, added_file
)
# git commit needs EMAIL on some machines
Command(cwd).execute(cmd, EMAIL=email)
# PUSH it back
_REPO = None
if vcs == 'hg':
_REPO = HG_REPO
elif vcs == 'git':
_REPO = GIT_REPO
kwargs['dest'] = ''
clone_url = _construct_url(_REPO, **kwargs)
if 'clone_url' in kwargs:
clone_url = kwargs['clone_url']
stdout = stderr = None
if vcs == 'hg':
stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
elif vcs == 'git':
stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
return stdout, stderr
def set_anonymous_access(enable=True):
user = User.get_by_username(User.DEFAULT_USER)
user.active = enable
Session().add(user)
Session().commit()
print '\tanonymous access is now:', enable
if enable != User.get_by_username(User.DEFAULT_USER).active:
raise Exception('Cannot set anonymous access')
#==============================================================================
# TESTS
#==============================================================================
def _check_proper_git_push(stdout, stderr):
#WTF Git stderr is output ?!
assert 'fatal' not in stderr
assert 'rejected' not in stderr
assert 'Pushing to' in stderr
assert 'master -> master' in stderr
class TestVCSOperations(TestController):
@classmethod
def setup_class(cls):
#DISABLE ANONYMOUS ACCESS
set_anonymous_access(False)
def setup_method(self, method):
r = Repository.get_by_repo_name(GIT_REPO)
Repository.unlock(r)
r.enable_locking = False
Session().add(r)
Session().commit()
r = Repository.get_by_repo_name(HG_REPO)
Repository.unlock(r)
r.enable_locking = False
Session().add(r)
Session().commit()
def test_clone_hg_repo_by_admin(self):
clone_url = _construct_url(HG_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
assert 'requesting all changes' in stdout
assert 'adding changesets' in stdout
assert 'adding manifests' in stdout
assert 'adding file changes' in stdout
assert stderr == ''
def test_clone_git_repo_by_admin(self):
clone_url = _construct_url(GIT_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
assert 'Cloning into' in stdout + stderr
assert stderr == '' or stdout == ''
def test_clone_wrong_credentials_hg(self):
clone_url = _construct_url(HG_REPO, passwd='bad!')
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
assert 'abort: authorization failed' in stderr
def test_clone_wrong_credentials_git(self):
clone_url = _construct_url(GIT_REPO, passwd='bad!')
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
assert 'fatal: Authentication failed' in stderr
def test_clone_git_dir_as_hg(self):
clone_url = _construct_url(GIT_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_hg_repo_as_git(self):
clone_url = _construct_url(HG_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
assert 'not found' in stderr
def test_clone_non_existing_path_hg(self):
clone_url = _construct_url('trololo')
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_non_existing_path_git(self):
clone_url = _construct_url('trololo')
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
assert 'not found' in stderr
def test_push_new_file_hg(self):
DEST = _get_tmp_dir()
clone_url = _construct_url(HG_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
fixture.create_fork(HG_REPO, fork_name)
clone_url = _construct_url(fork_name).split()[0]
stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url)
assert 'pushing to' in stdout
assert 'Repository size' in stdout
assert 'Last revision is now' in stdout
def test_push_new_file_git(self):
DEST = _get_tmp_dir()
clone_url = _construct_url(GIT_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
# commit some stuff into this repo
fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
fixture.create_fork(GIT_REPO, fork_name)
clone_url = _construct_url(fork_name).split()[0]
stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url)
print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] # TODO: what is this for
_check_proper_git_push(stdout, stderr)
def test_push_invalidates_cache_hg(self):
key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
==HG_REPO).scalar()
if not key:
key = CacheInvalidation(HG_REPO, HG_REPO)
key.cache_active = True
Session().add(key)
Session().commit()
DEST = _get_tmp_dir()
clone_url = _construct_url(HG_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
fixture.create_fork(HG_REPO, fork_name)
clone_url = _construct_url(fork_name).split()[0]
stdout, stderr = _add_files_and_push('hg', DEST, files_no=1, clone_url=clone_url)
key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
==fork_name).all()
assert key == []
def test_push_invalidates_cache_git(self):
key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
==GIT_REPO).scalar()
if not key:
key = CacheInvalidation(GIT_REPO, GIT_REPO)
key.cache_active = True
Session().add(key)
Session().commit()
DEST = _get_tmp_dir()
clone_url = _construct_url(GIT_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
# commit some stuff into this repo
fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
fixture.create_fork(GIT_REPO, fork_name)
clone_url = _construct_url(fork_name).split()[0]
stdout, stderr = _add_files_and_push('git', DEST, files_no=1, clone_url=clone_url)
_check_proper_git_push(stdout, stderr)
key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
==fork_name).all()
assert key == []
def test_push_wrong_credentials_hg(self):
DEST = _get_tmp_dir()
clone_url = _construct_url(HG_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
passwd='name')
assert 'abort: authorization failed' in stderr
def test_push_wrong_credentials_git(self):
DEST = _get_tmp_dir()
clone_url = _construct_url(GIT_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
stdout, stderr = _add_files_and_push('git', DEST, user='bad',
passwd='name')
assert 'fatal: Authentication failed' in stderr
def test_push_back_to_wrong_url_hg(self):
DEST = _get_tmp_dir()
clone_url = _construct_url(HG_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
stdout, stderr = _add_files_and_push('hg', DEST,
clone_url='http://%s/tmp' % HOST)
assert 'HTTP Error 404: Not Found' in stderr
def test_push_back_to_wrong_url_git(self):
DEST = _get_tmp_dir()
clone_url = _construct_url(GIT_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
stdout, stderr = _add_files_and_push('git', DEST,
clone_url='http://%s/tmp' % HOST)
assert 'not found' in stderr
def test_clone_and_create_lock_hg(self):
# enable locking
r = Repository.get_by_repo_name(HG_REPO)
r.enable_locking = True
Session().add(r)
Session().commit()
# clone
clone_url = _construct_url(HG_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
#check if lock was made
r = Repository.get_by_repo_name(HG_REPO)
assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
def test_clone_and_create_lock_git(self):
# enable locking
r = Repository.get_by_repo_name(GIT_REPO)
r.enable_locking = True
Session().add(r)
Session().commit()
# clone
clone_url = _construct_url(GIT_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
#check if lock was made
r = Repository.get_by_repo_name(GIT_REPO)
assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
def test_clone_after_repo_was_locked_hg(self):
#lock repo
r = Repository.get_by_repo_name(HG_REPO)
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
#pull fails since repo is locked
clone_url = _construct_url(HG_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
% (HG_REPO, TEST_USER_ADMIN_LOGIN))
assert msg in stderr
def test_clone_after_repo_was_locked_git(self):
#lock repo
r = Repository.get_by_repo_name(GIT_REPO)
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
#pull fails since repo is locked
clone_url = _construct_url(GIT_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
msg = ("""The requested URL returned error: 423""")
assert msg in stderr
def test_push_on_locked_repo_by_other_user_hg(self):
#clone some temp
DEST = _get_tmp_dir()
clone_url = _construct_url(HG_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
#lock repo
r = Repository.get_by_repo_name(HG_REPO)
# let this user actually push !
RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
perm='repository.write')
Session().commit()
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
#push fails repo is locked by other user !
stdout, stderr = _add_files_and_push('hg', DEST,
user=TEST_USER_REGULAR_LOGIN,
passwd=TEST_USER_REGULAR_PASS)
msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
% (HG_REPO, TEST_USER_ADMIN_LOGIN))
assert msg in stderr
def test_push_on_locked_repo_by_other_user_git(self):
#clone some temp
DEST = _get_tmp_dir()
clone_url = _construct_url(GIT_REPO, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
#lock repo
r = Repository.get_by_repo_name(GIT_REPO)
# let this user actually push !
RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
perm='repository.write')
Session().commit()
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
#push fails repo is locked by other user !
stdout, stderr = _add_files_and_push('git', DEST,
user=TEST_USER_REGULAR_LOGIN,
passwd=TEST_USER_REGULAR_PASS)
err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
assert err in stderr
#TODO: fix this somehow later on Git, Git is stupid and even if we throw
#back 423 to it, it makes ANOTHER request and we fail there with 405 :/
msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
% (GIT_REPO, TEST_USER_ADMIN_LOGIN))
#msg = "405 Method Not Allowed"
#assert msg in stderr
def test_push_unlocks_repository_hg(self):
# enable locking
fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
fixture.create_fork(HG_REPO, fork_name)
r = Repository.get_by_repo_name(fork_name)
r.enable_locking = True
Session().add(r)
Session().commit()
#clone some temp
DEST = _get_tmp_dir()
clone_url = _construct_url(fork_name, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
#check for lock repo after clone
r = Repository.get_by_repo_name(fork_name)
uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
assert r.locked[0] == uid
#push is ok and repo is now unlocked
stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url.split()[0])
assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
#we need to cleanup the Session Here !
Session.remove()
r = Repository.get_by_repo_name(fork_name)
assert r.locked == [None, None]
#TODO: fix me ! somehow during tests hooks don't get called on Git
def test_push_unlocks_repository_git(self):
# enable locking
fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
fixture.create_fork(GIT_REPO, fork_name)
r = Repository.get_by_repo_name(fork_name)
r.enable_locking = True
Session().add(r)
Session().commit()
#clone some temp
DEST = _get_tmp_dir()
clone_url = _construct_url(fork_name, dest=DEST)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
#check for lock repo after clone
r = Repository.get_by_repo_name(fork_name)
assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
#push is ok and repo is now unlocked
stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url.split()[0])
_check_proper_git_push(stdout, stderr)
assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
#we need to cleanup the Session Here !
Session.remove()
r = Repository.get_by_repo_name(fork_name)
assert r.locked == [None, None]
def test_ip_restriction_hg(self):
user_model = UserModel()
try:
user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
Session().commit()
clone_url = _construct_url(HG_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
assert 'abort: HTTP Error 403: Forbidden' in stderr
finally:
#release IP restrictions
for ip in UserIpMap.get_all():
UserIpMap.delete(ip.ip_id)
Session().commit()
# IP permissions are cached, need to invalidate this cache explicitly
invalidate_all_caches()
clone_url = _construct_url(HG_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
assert 'requesting all changes' in stdout
assert 'adding changesets' in stdout
assert 'adding manifests' in stdout
assert 'adding file changes' in stdout
assert stderr == ''
def test_ip_restriction_git(self):
user_model = UserModel()
try:
user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
Session().commit()
clone_url = _construct_url(GIT_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
# The message apparently changed in Git 1.8.3, so match it loosely.
assert re.search(r'\b403\b', stderr)
finally:
#release IP restrictions
for ip in UserIpMap.get_all():
UserIpMap.delete(ip.ip_id)
Session().commit()
# IP permissions are cached, need to invalidate this cache explicitly
invalidate_all_caches()
clone_url = _construct_url(GIT_REPO)
stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
assert 'Cloning into' in stdout + stderr
assert stderr == '' or stdout == ''