test_hg_operations.py
378 lines
| 10.6 KiB
| text/x-python
|
PythonLexer
r896 | # -*- coding: utf-8 -*- | ||
""" | |||
rhodecode.tests.test_hg_operations | |||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |||
Test suite for making push/pull operations | |||
r1203 | |||
r896 | :created_on: Dec 30, 2010 | ||
:copyright: (c) 2010 by marcink. | |||
:license: LICENSE_NAME, see LICENSE_FILE for more details. | |||
""" | |||
import os | |||
r1498 | import time | ||
import sys | |||
r896 | import shutil | ||
import logging | |||
r1498 | |||
r930 | from os.path import join as jn | ||
r1277 | from os.path import dirname as dn | ||
r896 | |||
r930 | from tempfile import _RandomNameSequence | ||
r896 | from subprocess import Popen, PIPE | ||
r930 | from paste.deploy import appconfig | ||
from pylons import config | |||
from sqlalchemy import engine_from_config | |||
from rhodecode.lib.utils import add_cache | |||
from rhodecode.model import init_model | |||
from rhodecode.model import meta | |||
r1008 | from rhodecode.model.db import User, Repository | ||
r930 | from rhodecode.lib.auth import get_crypt_password | ||
r896 | |||
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO | |||
r930 | from rhodecode.config.environment import load_environment | ||
r1277 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) | ||
conf = appconfig('config:development.ini', relative_to=rel_path) | |||
r930 | load_environment(conf.global_conf, conf.local_conf) | ||
add_cache(conf) | |||
r896 | |||
USER = 'test_admin' | |||
PASS = 'test12' | |||
HOST = '127.0.0.1:5000' | |||
r1498 | DEBUG = bool(int(sys.argv[1])) | ||
print 'DEBUG:',DEBUG | |||
r896 | log = logging.getLogger(__name__) | ||
r909 | |||
class Command(object): | |||
def __init__(self, cwd): | |||
self.cwd = cwd | |||
r896 | |||
r909 | def execute(self, cmd, *args): | ||
"""Runs command on the system with given ``args``. | |||
""" | |||
command = cmd + ' ' + ' '.join(args) | |||
log.debug('Executing %s' % command) | |||
r910 | if DEBUG: | ||
print command | |||
r909 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | ||
stdout, stderr = p.communicate() | |||
r910 | if DEBUG: | ||
print stdout, stderr | |||
r909 | return stdout, stderr | ||
r896 | |||
r1498 | |||
def test_wrapp(func): | |||
def __wrapp(*args,**kwargs): | |||
print '###%s###' %func.__name__ | |||
try: | |||
res = func(*args,**kwargs) | |||
except: | |||
print '--%s failed--' % func.__name__ | |||
return | |||
print 'ok' | |||
return res | |||
return __wrapp | |||
r930 | def get_session(): | ||
engine = engine_from_config(conf, 'sqlalchemy.db1.') | |||
init_model(engine) | |||
r1498 | sa = meta.Session | ||
r930 | return sa | ||
def create_test_user(force=True): | |||
r1008 | print 'creating test user' | ||
r930 | sa = get_session() | ||
user = sa.query(User).filter(User.username == USER).scalar() | |||
r988 | |||
r1008 | if force and user is not None: | ||
print 'removing current user' | |||
r1047 | for repo in sa.query(Repository).filter(Repository.user == user).all(): | ||
sa.delete(repo) | |||
r930 | sa.delete(user) | ||
sa.commit() | |||
if user is None or force: | |||
r1008 | print 'creating new one' | ||
r930 | new_usr = User() | ||
new_usr.username = USER | |||
new_usr.password = get_crypt_password(PASS) | |||
r988 | new_usr.email = 'mail@mail.com' | ||
new_usr.name = 'test' | |||
new_usr.lastname = 'lasttestname' | |||
r930 | new_usr.active = True | ||
r1008 | new_usr.admin = True | ||
r930 | sa.add(new_usr) | ||
sa.commit() | |||
r1008 | print 'done' | ||
def create_test_repo(force=True): | |||
from rhodecode.model.repo import RepoModel | |||
sa = get_session() | |||
user = sa.query(User).filter(User.username == USER).scalar() | |||
if user is None: | |||
raise Exception('user not found') | |||
r930 | |||
r1008 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | ||
if repo is None: | |||
print 'repo not found creating' | |||
form_data = {'repo_name':HG_REPO, | |||
'repo_type':'hg', | |||
r1296 | 'private':False, | ||
'clone_uri':'' } | |||
r1008 | rm = RepoModel(sa) | ||
rm.base_path = '/home/hg' | |||
rm.create(form_data, user) | |||
r896 | |||
r1047 | |||
def set_anonymous_access(enable=True): | |||
sa = get_session() | |||
user = sa.query(User).filter(User.username == 'default').one() | |||
user.active = enable | |||
sa.add(user) | |||
sa.commit() | |||
r1498 | sa.remove() | ||
print 'anonymous access is now:',enable | |||
r1047 | |||
def get_anonymous_access(): | |||
sa = get_session() | |||
r1498 | obj1 = sa.query(User).filter(User.username == 'default').one() | ||
sa.expire(obj1) | |||
return obj1.active | |||
r1047 | |||
r910 | #============================================================================== | ||
r896 | # TESTS | ||
r910 | #============================================================================== | ||
r1498 | @test_wrapp | ||
def test_clone_with_credentials(no_errors=False): | |||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||
r896 | try: | ||
r909 | shutil.rmtree(path, ignore_errors=True) | ||
os.makedirs(path) | |||
#print 'made dirs %s' % jn(path) | |||
r896 | except OSError: | ||
r909 | raise | ||
r1498 | print 'checking if anonymous access is enabled' | ||
anonymous_access = get_anonymous_access() | |||
if anonymous_access: | |||
print 'enabled, disabling it ' | |||
set_anonymous_access(enable=False) | |||
time.sleep(1) | |||
r896 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | ||
{'user':USER, | |||
'pass':PASS, | |||
'host':HOST, | |||
'cloned_repo':HG_REPO, | |||
r909 | 'dest':path} | ||
stdout, stderr = Command(cwd).execute('hg clone', clone_url) | |||
r1008 | if no_errors is False: | ||
assert """adding file changes""" in stdout, 'no messages about cloning' | |||
assert """abort""" not in stderr , 'got error from clone' | |||
r909 | |||
r1498 | @test_wrapp | ||
def test_clone_anonymous(): | |||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||
try: | |||
shutil.rmtree(path, ignore_errors=True) | |||
os.makedirs(path) | |||
#print 'made dirs %s' % jn(path) | |||
except OSError: | |||
raise | |||
r1047 | print 'checking if anonymous access is enabled' | ||
anonymous_access = get_anonymous_access() | |||
if not anonymous_access: | |||
print 'not enabled, enabling it ' | |||
set_anonymous_access(enable=True) | |||
r1498 | time.sleep(1) | ||
r909 | clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ | ||
{'user':USER, | |||
'pass':PASS, | |||
'host':HOST, | |||
'cloned_repo':HG_REPO, | |||
'dest':path} | |||
r896 | |||
r909 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) | ||
r1047 | |||
r909 | assert """adding file changes""" in stdout, 'no messages about cloning' | ||
assert """abort""" not in stderr , 'got error from clone' | |||
r1047 | #disable if it was enabled | ||
if not anonymous_access: | |||
print 'disabling anonymous access' | |||
set_anonymous_access(enable=False) | |||
r1498 | @test_wrapp | ||
r909 | def test_clone_wrong_credentials(): | ||
cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |||
try: | |||
shutil.rmtree(path, ignore_errors=True) | |||
os.makedirs(path) | |||
#print 'made dirs %s' % jn(path) | |||
except OSError: | |||
raise | |||
r1498 | print 'checking if anonymous access is enabled' | ||
anonymous_access = get_anonymous_access() | |||
if anonymous_access: | |||
print 'enabled, disabling it ' | |||
set_anonymous_access(enable=False) | |||
r909 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | ||
{'user':USER + 'error', | |||
'pass':PASS, | |||
'host':HOST, | |||
'cloned_repo':HG_REPO, | |||
'dest':path} | |||
stdout, stderr = Command(cwd).execute('hg clone', clone_url) | |||
r1498 | if not """abort: authorization failed""" in stderr: | ||
raise Exception('Failure') | |||
r909 | |||
r1498 | @test_wrapp | ||
r896 | def test_pull(): | ||
pass | |||
r1498 | @test_wrapp | ||
r1008 | def test_push_modify_file(f_name='setup.py'): | ||
cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |||
modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) | |||
r896 | for i in xrange(5): | ||
cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r896 | |||
cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r896 | |||
r909 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) | ||
r896 | |||
r1498 | @test_wrapp | ||
r1296 | def test_push_new_file(commits=15, with_clone=True): | ||
r909 | |||
r1296 | if with_clone: | ||
r1498 | test_clone_with_credentials(no_errors=True) | ||
r896 | |||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||
r1087 | added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) | ||
r896 | |||
r909 | Command(cwd).execute('touch %s' % added_file) | ||
Command(cwd).execute('hg add %s' % added_file) | |||
r896 | |||
r930 | for i in xrange(commits): | ||
r896 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | ||
r909 | Command(cwd).execute(cmd) | ||
r896 | |||
r1296 | cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i, | ||
'Marcin Kuźminski <marcin@python-blog.com>', | |||
added_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r896 | |||
r909 | push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | ||
{'user':USER, | |||
'pass':PASS, | |||
'host':HOST, | |||
'cloned_repo':HG_REPO, | |||
'dest':jn(TESTS_TMP_PATH, HG_REPO)} | |||
r930 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) | ||
r896 | |||
r1498 | @test_wrapp | ||
r896 | def test_push_wrong_credentials(): | ||
r1008 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||
r896 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | ||
{'user':USER + 'xxx', | |||
'pass':PASS, | |||
'host':HOST, | |||
'cloned_repo':HG_REPO, | |||
'dest':jn(TESTS_TMP_PATH, HG_REPO)} | |||
modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') | |||
for i in xrange(5): | |||
cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r896 | |||
cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r896 | |||
r909 | Command(cwd).execute('hg push %s' % clone_url) | ||
r896 | |||
r1498 | @test_wrapp | ||
r897 | def test_push_wrong_path(): | ||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||
added_file = jn(path, 'somefile.py') | |||
r896 | |||
r897 | try: | ||
r909 | shutil.rmtree(path, ignore_errors=True) | ||
os.makedirs(path) | |||
print 'made dirs %s' % jn(path) | |||
r897 | except OSError: | ||
r909 | raise | ||
r897 | |||
r909 | Command(cwd).execute("""echo '' > %s""" % added_file) | ||
Command(cwd).execute("""hg init %s""" % path) | |||
Command(cwd).execute("""hg add %s""" % added_file) | |||
r897 | |||
for i in xrange(2): | |||
cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r897 | |||
cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) | |||
r909 | Command(cwd).execute(cmd) | ||
r897 | |||
r909 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | ||
{'user':USER, | |||
'pass':PASS, | |||
'host':HOST, | |||
'cloned_repo':HG_REPO + '_error', | |||
'dest':jn(TESTS_TMP_PATH, HG_REPO)} | |||
stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) | |||
r1498 | if not """abort: HTTP Error 403: Forbidden""" in stderr: | ||
raise Exception('Failure') | |||
r909 | |||
r896 | |||
if __name__ == '__main__': | |||
r1008 | create_test_user(force=False) | ||
create_test_repo() | |||
r1498 | |||
# test_push_modify_file() | |||
test_clone_with_credentials() | |||
test_clone_wrong_credentials() | |||
r910 | |||
r988 | |||
r1296 | test_push_new_file(commits=2, with_clone=True) | ||
r1498 | # | ||
test_push_wrong_path() | |||
test_clone_anonymous() | |||
test_push_wrong_credentials() |