test_scm_operations.py
242 lines
| 7.0 KiB
| text/x-python
|
PythonLexer
r2527 | # -*- coding: utf-8 -*- | |||
""" | ||||
r2728 | rhodecode.tests.test_scm_operations | |||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
r2527 | ||||
r2728 | Test suite for making push/pull operations. | |||
Run using:: | ||||
RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py | ||||
r2527 | ||||
:created_on: Dec 30, 2010 | ||||
:author: marcink | ||||
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | ||||
:license: GPLv3, see COPYING for more details. | ||||
""" | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
import os | ||||
r2728 | import tempfile | |||
r2527 | from os.path import join as jn | |||
from os.path import dirname as dn | ||||
from tempfile import _RandomNameSequence | ||||
from subprocess import Popen, PIPE | ||||
r2728 | from rhodecode.tests import * | |||
r2527 | from rhodecode.model.db import User, Repository, UserLog | |||
r2728 | from rhodecode.model.meta import Session | |||
r2527 | ||||
r2728 | DEBUG = True | |||
HOST = '127.0.0.1:5000' # test host | ||||
r2527 | ||||
class Command(object): | ||||
def __init__(self, cwd): | ||||
self.cwd = cwd | ||||
def execute(self, cmd, *args): | ||||
r2728 | """ | |||
Runs command on the system with given ``args``. | ||||
r2527 | """ | |||
command = cmd + ' ' + ' '.join(args) | ||||
if DEBUG: | ||||
r2728 | print '*** CMD %s ***' % command | |||
r2527 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | |||
stdout, stderr = p.communicate() | ||||
if DEBUG: | ||||
print stdout, stderr | ||||
return stdout, stderr | ||||
r2728 | def _get_tmp_dir(): | |||
return tempfile.mkdtemp(prefix='rc_integration_test') | ||||
r2527 | ||||
r2728 | def _construct_url(repo, dest=None, **kwargs): | |||
if dest is None: | ||||
#make temp clone | ||||
dest = _get_tmp_dir() | ||||
params = { | ||||
'user': TEST_USER_ADMIN_LOGIN, | ||||
'passwd': TEST_USER_ADMIN_PASS, | ||||
'host': HOST, | ||||
'cloned_repo': repo, | ||||
'dest': dest | ||||
} | ||||
params.update(**kwargs) | ||||
if params['user'] and params['passwd']: | ||||
_url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params | ||||
else: | ||||
_url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params | ||||
return _url | ||||
r2527 | ||||
def set_anonymous_access(enable=True): | ||||
r2728 | user = User.get_by_username(User.DEFAULT_USER) | |||
r2527 | user.active = enable | |||
r2728 | Session().add(user) | |||
Session().commit() | ||||
r2527 | print '\tanonymous access is now:', enable | |||
r2728 | if enable != User.get_by_username(User.DEFAULT_USER).active: | |||
r2527 | raise Exception('Cannot set anonymous access') | |||
r2728 | def setup_module(): | |||
#DISABLE ANONYMOUS ACCESS | ||||
set_anonymous_access(False) | ||||
def test_clone_hg_repo_by_admin(): | ||||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'requesting all changes' in stdout | ||||
assert 'adding changesets' in stdout | ||||
assert 'adding manifests' in stdout | ||||
assert 'adding file changes' in stdout | ||||
assert stderr == '' | ||||
r2527 | ||||
r2728 | def test_clone_git_repo_by_admin(): | |||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
r2527 | ||||
r2728 | assert 'Cloning into' in stdout | |||
assert stderr == '' | ||||
r2527 | ||||
r2728 | def test_clone_wrong_credentials_hg(): | |||
clone_url = _construct_url(HG_REPO, passwd='bad!') | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'abort: authorization failed' in stderr | ||||
r2527 | ||||
r2728 | ||||
def test_clone_wrong_credentials_git(): | ||||
clone_url = _construct_url(GIT_REPO, passwd='bad!') | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'fatal: Authentication failed' in stderr | ||||
r2527 | ||||
r2728 | def test_clone_git_dir_as_hg(): | |||
clone_url = _construct_url(GIT_REPO) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'HTTP Error 404: Not Found' in stderr | ||||
r2527 | ||||
r2728 | def test_clone_hg_repo_as_git(): | |||
clone_url = _construct_url(HG_REPO) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'not found: did you run git update-server-info on the server' in stderr | ||||
r2527 | ||||
r2728 | def test_clone_non_existing_path_hg(): | |||
clone_url = _construct_url('trololo') | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
assert 'HTTP Error 404: Not Found' in stderr | ||||
r2527 | ||||
r2728 | def test_clone_non_existing_path_git(): | |||
clone_url = _construct_url('trololo') | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
assert 'not found: did you run git update-server-info on the server' in stderr | ||||
r2527 | ||||
r2728 | def test_push_new_file_hg(): | |||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(HG_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('hg clone', clone_url) | ||||
r2527 | ||||
r2728 | # commit some stuff into this repo | |||
cwd = path = jn(DEST) | ||||
r2527 | added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) | |||
Command(cwd).execute('touch %s' % added_file) | ||||
Command(cwd).execute('hg add %s' % added_file) | ||||
r2728 | for i in xrange(3): | |||
r2527 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | |||
Command(cwd).execute(cmd) | ||||
r2728 | cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % ( | |||
i, | ||||
'Marcin Kuźminski <marcin@python-blog.com>', | ||||
added_file | ||||
) | ||||
r2527 | Command(cwd).execute(cmd) | |||
r2728 | # PUSH it back | |||
clone_url = _construct_url(HG_REPO, dest='') | ||||
stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) | ||||
r2527 | ||||
r2728 | assert 'pushing to' in stdout | |||
assert 'Repository size' in stdout | ||||
assert 'Last revision is now' in stdout | ||||
r2527 | ||||
r2728 | def test_push_new_file_git(): | |||
DEST = _get_tmp_dir() | ||||
clone_url = _construct_url(GIT_REPO, dest=DEST) | ||||
stdout, stderr = Command('/tmp').execute('git clone', clone_url) | ||||
r2527 | ||||
r2728 | # commit some stuff into this repo | |||
cwd = path = jn(DEST) | ||||
added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) | ||||
Command(cwd).execute('touch %s' % added_file) | ||||
Command(cwd).execute('git add %s' % added_file) | ||||
r2527 | ||||
r2728 | for i in xrange(3): | |||
r2527 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | |||
Command(cwd).execute(cmd) | ||||
r2728 | cmd = """git ci -m 'commited new %s' --author '%s' %s """ % ( | |||
i, | ||||
'Marcin Kuźminski <marcin@python-blog.com>', | ||||
added_file | ||||
) | ||||
r2527 | Command(cwd).execute(cmd) | |||
r2728 | # PUSH it back | |||
clone_url = _construct_url(GIT_REPO, dest='') | ||||
stdout, stderr = Command(cwd).execute('git push --verbose', clone_url) | ||||
r2527 | ||||
r2728 | #WTF git stderr ?! | |||
assert 'master -> master' in stderr | ||||
r2527 | ||||
r2728 | def test_push_modify_existing_file_hg(): | |||
assert 0 | ||||
r2527 | ||||
r2728 | def test_push_modify_existing_file_git(): | |||
assert 0 | ||||
def test_push_wrong_credentials_hg(): | ||||
assert 0 | ||||
r2527 | ||||
r2728 | def test_push_wrong_credentials_git(): | |||
assert 0 | ||||
def test_push_back_to_wrong_url_hg(): | ||||
assert 0 | ||||
r2527 | ||||
r2728 | def test_push_back_to_wrong_url_git(): | |||
assert 0 | ||||
r2527 | ||||
r2728 | #TODO: write all locking tests | |||