test_vcs_operations.py
470 lines
| 17.1 KiB
| text/x-python
|
PythonLexer
r2752 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.tests.test_scm_operations | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
Test suite for making push/pull operations. | ||||
Run using:: | ||||
RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py | ||||
:created_on: Dec 30, 2010 | ||||
:author: marcink | ||||
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | ||||
:license: GPLv3, see COPYING for more details. | ||||
""" | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
import os | ||||
import tempfile | ||||
import unittest | ||||
r3180 | import time | |||
r2752 | from os.path import join as jn | |||
from os.path import dirname as dn | ||||
from tempfile import _RandomNameSequence | ||||
from subprocess import Popen, PIPE | ||||
from rhodecode.tests import * | ||||
r3180 | from rhodecode.model.db import User, Repository, UserLog, UserIpMap | |||
r2752 | from rhodecode.model.meta import Session | |||
from rhodecode.model.repo import RepoModel | ||||
r3131 | from rhodecode.model.user import UserModel | |||
r2752 | ||||
DEBUG = True | ||||
HOST = '127.0.0.1:5000' # test host | ||||
class Command(object): | ||||
def __init__(self, cwd): | ||||
self.cwd = cwd | ||||
def execute(self, cmd, *args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
command = cmd + ' ' + ' '.join(args) | ||||
if DEBUG: | ||||
print '*** CMD %s ***' % command | ||||
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | ||||
stdout, stderr = p.communicate() | ||||
if DEBUG: | ||||
print stdout, stderr | ||||
return stdout, stderr | ||||
def _get_tmp_dir(): | ||||
return tempfile.mkdtemp(prefix='rc_integration_test') | ||||
def _construct_url(repo, dest=None, **kwargs): | ||||
if dest is None: | ||||
#make temp clone | ||||
dest = _get_tmp_dir() | ||||
params = { | ||||
'user': TEST_USER_ADMIN_LOGIN, | ||||
'passwd': TEST_USER_ADMIN_PASS, | ||||
'host': HOST, | ||||
'cloned_repo': repo, | ||||
'dest': dest | ||||
} | ||||
params.update(**kwargs) | ||||
if params['user'] and params['passwd']: | ||||
_url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params | ||||
else: | ||||
_url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params | ||||
return _url | ||||
def _add_files_and_push(vcs, DEST, **kwargs): | ||||
""" | ||||
Generate some files, add it to DEST repo and push back | ||||
vcs is git or hg and defines what VCS we want to make those files for | ||||
:param vcs: | ||||
:param DEST: | ||||
""" | ||||
# commit some stuff into this repo | ||||
cwd = path = jn(DEST) | ||||
#added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) | ||||
added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) | ||||
Command(cwd).execute('touch %s' % added_file) | ||||
Command(cwd).execute('%s add %s' % (vcs, added_file)) | ||||
for i in xrange(3): | ||||
cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | ||||
Command(cwd).execute(cmd) | ||||
if vcs == 'hg': | ||||
cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % ( | ||||
i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file | ||||
) | ||||
elif vcs == 'git': | ||||
r3199 | cmd = """git commit -m 'commited new %s' --author '%s' %s """ % ( | |||
r2752 | i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file | |||
) | ||||
Command(cwd).execute(cmd) | ||||
# PUSH it back | ||||
if vcs == 'hg': | ||||
_REPO = HG_REPO | ||||
elif vcs == 'git': | ||||
_REPO = GIT_REPO | ||||
kwargs['dest'] = '' | ||||
clone_url = _construct_url(_REPO, **kwargs) | ||||
if 'clone_url' in kwargs: | ||||
clone_url = kwargs['clone_url'] | ||||
if vcs == 'hg': | ||||
stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) | ||||
elif vcs == 'git': | ||||
stdout, stderr = Command(cwd).execute('git push', clone_url + " master") | ||||
return stdout, stderr | ||||
def set_anonymous_access(enable=True): | ||||
user = User.get_by_username(User.DEFAULT_USER) | ||||
user.active = enable | ||||
Session().add(user) | ||||
Session().commit() | ||||
print '\tanonymous access is now:', enable | ||||
if enable != User.get_by_username(User.DEFAULT_USER).active: | ||||
raise Exception('Cannot set anonymous access') | ||||
#============================================================================== | ||||
# TESTS | ||||
#============================================================================== | ||||
class TestVCSOperations(unittest.TestCase): | ||||
@classmethod | ||||
def setup_class(cls): | ||||
#DISABLE ANONYMOUS ACCESS | ||||
set_anonymous_access(False) | ||||
def setUp(self): | ||||
r = Repository.get_by_repo_name(GIT_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
Repository.unlock(r) | ||||
r.enable_locking = False | ||||
Session().add(r) | ||||
Session().commit() | ||||
def test_clone_hg_repo_by_admin(self): | ||||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'requesting all changes' in stdout | ||||
assert 'adding changesets' in stdout | ||||
assert 'adding manifests' in stdout | ||||
assert 'adding file changes' in stdout | ||||
assert stderr == '' | ||||
def test_clone_git_repo_by_admin(self): | ||||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'Cloning into' in stdout | ||||
assert stderr == '' | ||||
def test_clone_wrong_credentials_hg(self): | ||||
clone_url = _construct_url(HG_REPO, passwd='bad!') | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'abort: authorization failed' in stderr | ||||
def test_clone_wrong_credentials_git(self): | ||||
clone_url = _construct_url(GIT_REPO, passwd='bad!') | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'fatal: Authentication failed' in stderr | ||||
def test_clone_git_dir_as_hg(self): | ||||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'HTTP Error 404: Not Found' in stderr | ||||
def test_clone_hg_repo_as_git(self): | ||||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'not found:' in stderr | ||||
def test_clone_non_existing_path_hg(self): | ||||
clone_url = _construct_url('trololo') | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'HTTP Error 404: Not Found' in stderr | ||||
def test_clone_non_existing_path_git(self): | ||||
clone_url = _construct_url('trololo') | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'not found:' in stderr | ||||
def test_push_new_file_hg(self): | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(HG_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
stdout, stderr = _add_files_and_push('hg', DEST) | ||||
assert 'pushing to' in stdout | ||||
assert 'Repository size' in stdout | ||||
assert 'Last revision is now' in stdout | ||||
def test_push_new_file_git(self): | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(GIT_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
# commit some stuff into this repo | ||||
stdout, stderr = _add_files_and_push('git', DEST) | ||||
#WTF git stderr ?! | ||||
assert 'master -> master' in stderr | ||||
def test_push_wrong_credentials_hg(self): | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(HG_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
stdout, stderr = _add_files_and_push('hg', DEST, user='bad', | ||||
passwd='name') | ||||
assert 'abort: authorization failed' in stderr | ||||
def test_push_wrong_credentials_git(self): | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(GIT_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
stdout, stderr = _add_files_and_push('git', DEST, user='bad', | ||||
passwd='name') | ||||
assert 'fatal: Authentication failed' in stderr | ||||
def test_push_back_to_wrong_url_hg(self): | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(HG_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
stdout, stderr = _add_files_and_push('hg', DEST, | ||||
clone_url='http://127.0.0.1:5000/tmp',) | ||||
assert 'HTTP Error 404: Not Found' in stderr | ||||
def test_push_back_to_wrong_url_git(self): | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(GIT_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
stdout, stderr = _add_files_and_push('git', DEST, | ||||
clone_url='http://127.0.0.1:5000/tmp',) | ||||
assert 'not found:' in stderr | ||||
def test_clone_and_create_lock_hg(self): | ||||
# enable locking | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
r.enable_locking = True | ||||
Session().add(r) | ||||
Session().commit() | ||||
# clone | ||||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
#check if lock was made | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | ||||
def test_clone_and_create_lock_git(self): | ||||
# enable locking | ||||
r = Repository.get_by_repo_name(GIT_REPO) | ||||
r.enable_locking = True | ||||
Session().add(r) | ||||
Session().commit() | ||||
# clone | ||||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
#check if lock was made | ||||
r = Repository.get_by_repo_name(GIT_REPO) | ||||
assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | ||||
def test_clone_after_repo_was_locked_hg(self): | ||||
#lock repo | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | ||||
#pull fails since repo is locked | ||||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | ||||
% (HG_REPO, TEST_USER_ADMIN_LOGIN)) | ||||
assert msg in stderr | ||||
def test_clone_after_repo_was_locked_git(self): | ||||
#lock repo | ||||
r = Repository.get_by_repo_name(GIT_REPO) | ||||
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | ||||
#pull fails since repo is locked | ||||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
r2957 | msg = ("""423 Repository `%s` locked by user `%s`""" | |||
r2752 | % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) | |||
assert msg in stderr | ||||
def test_push_on_locked_repo_by_other_user_hg(self): | ||||
#clone some temp | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(HG_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
#lock repo | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
# let this user actually push ! | ||||
RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, | ||||
perm='repository.write') | ||||
Session().commit() | ||||
Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | ||||
#push fails repo is locked by other user ! | ||||
stdout, stderr = _add_files_and_push('hg', DEST, | ||||
user=TEST_USER_REGULAR_LOGIN, | ||||
passwd=TEST_USER_REGULAR_PASS) | ||||
msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | ||||
% (HG_REPO, TEST_USER_ADMIN_LOGIN)) | ||||
assert msg in stderr | ||||
#TODO: fix me ! somehow during tests hooks don't get called on GIT | ||||
# def test_push_on_locked_repo_by_other_user_git(self): | ||||
# #clone some temp | ||||
# DEST = _get_tmp_dir() | ||||
# clone_url = _construct_url(GIT_REPO, dest=DEST) | ||||
# stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
# | ||||
# #lock repo | ||||
# r = Repository.get_by_repo_name(GIT_REPO) | ||||
# # let this user actually push ! | ||||
# RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, | ||||
# perm='repository.write') | ||||
# Session().commit() | ||||
# Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) | ||||
# | ||||
# #push fails repo is locked by other user ! | ||||
# stdout, stderr = _add_files_and_push('git', DEST, | ||||
# user=TEST_USER_REGULAR_LOGIN, | ||||
# passwd=TEST_USER_REGULAR_PASS) | ||||
# msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" | ||||
# % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) | ||||
# #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw | ||||
# # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ | ||||
# msg = "405 Method Not Allowed" | ||||
# assert msg in stderr | ||||
def test_push_unlocks_repository_hg(self): | ||||
# enable locking | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
r.enable_locking = True | ||||
Session().add(r) | ||||
Session().commit() | ||||
#clone some temp | ||||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(HG_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
#check for lock repo after clone | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | ||||
#push is ok and repo is now unlocked | ||||
stdout, stderr = _add_files_and_push('hg', DEST) | ||||
assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout | ||||
#we need to cleanup the Session Here ! | ||||
Session.remove() | ||||
r = Repository.get_by_repo_name(HG_REPO) | ||||
assert r.locked == [None, None] | ||||
#TODO: fix me ! somehow during tests hooks don't get called on GIT | ||||
# def test_push_unlocks_repository_git(self): | ||||
# # enable locking | ||||
# r = Repository.get_by_repo_name(GIT_REPO) | ||||
# r.enable_locking = True | ||||
# Session().add(r) | ||||
# Session().commit() | ||||
# #clone some temp | ||||
# DEST = _get_tmp_dir() | ||||
# clone_url = _construct_url(GIT_REPO, dest=DEST) | ||||
# stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
# | ||||
# #check for lock repo after clone | ||||
# r = Repository.get_by_repo_name(GIT_REPO) | ||||
# assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id | ||||
# | ||||
# #push is ok and repo is now unlocked | ||||
# stdout, stderr = _add_files_and_push('git', DEST) | ||||
# #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout | ||||
# #we need to cleanup the Session Here ! | ||||
# Session.remove() | ||||
# r = Repository.get_by_repo_name(GIT_REPO) | ||||
# assert r.locked == [None, None] | ||||
r3131 | ||||
def test_ip_restriction_hg(self): | ||||
user_model = UserModel() | ||||
r3180 | try: | |||
user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') | ||||
Session().commit() | ||||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'abort: HTTP Error 403: Forbidden' in stderr | ||||
finally: | ||||
#release IP restrictions | ||||
for ip in UserIpMap.getAll(): | ||||
UserIpMap.delete(ip.ip_id) | ||||
Session().commit() | ||||
time.sleep(2) | ||||
r3131 | clone_url = _construct_url(HG_REPO) | |||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'requesting all changes' in stdout | ||||
assert 'adding changesets' in stdout | ||||
assert 'adding manifests' in stdout | ||||
assert 'adding file changes' in stdout | ||||
assert stderr == '' | ||||
def test_ip_restriction_git(self): | ||||
user_model = UserModel() | ||||
r3180 | try: | |||
user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') | ||||
Session().commit() | ||||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'error: The requested URL returned error: 403 Forbidden' in stderr | ||||
finally: | ||||
#release IP restrictions | ||||
for ip in UserIpMap.getAll(): | ||||
UserIpMap.delete(ip.ip_id) | ||||
Session().commit() | ||||
time.sleep(2) | ||||
r3131 | clone_url = _construct_url(GIT_REPO) | |||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'Cloning into' in stdout | ||||
assert stderr == '' | ||||