##// END OF EJS Templates
fixes rhodecode upgrade problem that caused setuptool to crash on importing sqlalchemy models
fixes rhodecode upgrade problem that caused setuptool to crash on importing sqlalchemy models

File last commit:

r1532:2afe9320 beta
r1541:70e646b2 beta
Show More
test_hg_operations.py
412 lines | 11.8 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
"""
rhodecode.tests.test_hg_operations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test suite for making push/pull operations
:created_on: Dec 30, 2010
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import sys
import shutil
import logging
from os.path import join as jn
from os.path import dirname as dn
from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE
from paste.deploy import appconfig
from pylons import config
from sqlalchemy import engine_from_config
from rhodecode.lib.utils import add_cache
from rhodecode.model import init_model
from rhodecode.model import meta
from rhodecode.model.db import User, Repository, UserLog
from rhodecode.lib.auth import get_crypt_password
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
from rhodecode.config.environment import load_environment
rel_path = dn(dn(dn(os.path.abspath(__file__))))
conf = appconfig('config:development.ini', relative_to=rel_path)
load_environment(conf.global_conf, conf.local_conf)
add_cache(conf)
USER = 'test_admin'
PASS = 'test12'
HOST = '127.0.0.1:5000'
DEBUG = True if sys.argv[1:] else False
print 'DEBUG:', DEBUG
log = logging.getLogger(__name__)
class Command(object):
def __init__(self, cwd):
self.cwd = cwd
def execute(self, cmd, *args):
"""Runs command on the system with given ``args``.
"""
command = cmd + ' ' + ' '.join(args)
log.debug('Executing %s' % command)
if DEBUG:
print command
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
stdout, stderr = p.communicate()
if DEBUG:
print stdout, stderr
return stdout, stderr
def test_wrapp(func):
def __wrapp(*args, **kwargs):
print '>>>%s' % func.__name__
try:
res = func(*args, **kwargs)
except Exception, e:
print ('###############\n-'
'--%s failed %s--\n'
'###############\n' % (func.__name__, e))
sys.exit()
print '++OK++'
return res
return __wrapp
def get_session():
engine = engine_from_config(conf, 'sqlalchemy.db1.')
init_model(engine)
sa = meta.Session
return sa
def create_test_user(force=True):
print '\tcreating test user'
sa = get_session()
user = sa.query(User).filter(User.username == USER).scalar()
if force and user is not None:
print '\tremoving current user'
for repo in sa.query(Repository).filter(Repository.user == user).all():
sa.delete(repo)
sa.delete(user)
sa.commit()
if user is None or force:
print '\tcreating new one'
new_usr = User()
new_usr.username = USER
new_usr.password = get_crypt_password(PASS)
new_usr.email = 'mail@mail.com'
new_usr.name = 'test'
new_usr.lastname = 'lasttestname'
new_usr.active = True
new_usr.admin = True
sa.add(new_usr)
sa.commit()
print '\tdone'
def create_test_repo(force=True):
from rhodecode.model.repo import RepoModel
sa = get_session()
user = sa.query(User).filter(User.username == USER).scalar()
if user is None:
raise Exception('user not found')
repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
if repo is None:
print '\trepo not found creating'
form_data = {'repo_name':HG_REPO,
'repo_type':'hg',
'private':False,
'clone_uri':'' }
rm = RepoModel(sa)
rm.base_path = '/home/hg'
rm.create(form_data, user)
def set_anonymous_access(enable=True):
sa = get_session()
user = sa.query(User).filter(User.username == 'default').one()
sa.expire(user)
user.active = enable
sa.add(user)
sa.commit()
sa.remove()
import time;time.sleep(3)
print '\tanonymous access is now:', enable
def get_anonymous_access():
sa = get_session()
obj1 = sa.query(User).filter(User.username == 'default').one()
sa.expire(obj1)
return obj1.active
#==============================================================================
# TESTS
#==============================================================================
@test_wrapp
def test_clone_with_credentials(no_errors=False):
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
try:
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)
#print 'made dirs %s' % jn(path)
except OSError:
raise
print '\tchecking if anonymous access is enabled'
anonymous_access = get_anonymous_access()
if anonymous_access:
print '\tenabled, disabling it '
set_anonymous_access(enable=False)
time.sleep(1)
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
{'user':USER,
'pass':PASS,
'host':HOST,
'cloned_repo':HG_REPO,
'dest':path}
stdout, stderr = Command(cwd).execute('hg clone', clone_url)
if no_errors is False:
assert """adding file changes""" in stdout, 'no messages about cloning'
assert """abort""" not in stderr , 'got error from clone'
@test_wrapp
def test_clone_anonymous():
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
try:
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)
#print 'made dirs %s' % jn(path)
except OSError:
raise
print '\tchecking if anonymous access is enabled'
anonymous_access = get_anonymous_access()
if not anonymous_access:
print '\tnot enabled, enabling it '
set_anonymous_access(enable=True)
time.sleep(1)
clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
{'user':USER,
'pass':PASS,
'host':HOST,
'cloned_repo':HG_REPO,
'dest':path}
stdout, stderr = Command(cwd).execute('hg clone', clone_url)
assert """adding file changes""" in stdout, 'no messages about cloning'
assert """abort""" not in stderr , 'got error from clone'
#disable if it was enabled
if not anonymous_access:
print '\tdisabling anonymous access'
set_anonymous_access(enable=False)
@test_wrapp
def test_clone_wrong_credentials():
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
try:
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)
#print 'made dirs %s' % jn(path)
except OSError:
raise
print '\tchecking if anonymous access is enabled'
anonymous_access = get_anonymous_access()
if anonymous_access:
print '\tenabled, disabling it '
set_anonymous_access(enable=False)
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
{'user':USER + 'error',
'pass':PASS,
'host':HOST,
'cloned_repo':HG_REPO,
'dest':path}
stdout, stderr = Command(cwd).execute('hg clone', clone_url)
if not """abort: authorization failed""" in stderr:
raise Exception('Failure')
@test_wrapp
def test_pull():
pass
@test_wrapp
def test_push_modify_file(f_name='setup.py'):
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
for i in xrange(5):
cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
Command(cwd).execute(cmd)
cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
Command(cwd).execute(cmd)
Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
@test_wrapp
def test_push_new_file(commits=15, with_clone=True):
if with_clone:
test_clone_with_credentials(no_errors=True)
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
Command(cwd).execute('touch %s' % added_file)
Command(cwd).execute('hg add %s' % added_file)
for i in xrange(commits):
cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
Command(cwd).execute(cmd)
cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
'Marcin Kuźminski <marcin@python-blog.com>',
added_file)
Command(cwd).execute(cmd)
push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
{'user':USER,
'pass':PASS,
'host':HOST,
'cloned_repo':HG_REPO,
'dest':jn(TESTS_TMP_PATH, HG_REPO)}
Command(cwd).execute('hg push --verbose --debug %s' % push_url)
@test_wrapp
def test_push_wrong_credentials():
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
{'user':USER + 'xxx',
'pass':PASS,
'host':HOST,
'cloned_repo':HG_REPO,
'dest':jn(TESTS_TMP_PATH, HG_REPO)}
modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
for i in xrange(5):
cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
Command(cwd).execute(cmd)
cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
Command(cwd).execute(cmd)
Command(cwd).execute('hg push %s' % clone_url)
@test_wrapp
def test_push_wrong_path():
cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
added_file = jn(path, 'somefile.py')
try:
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)
print '\tmade dirs %s' % jn(path)
except OSError:
raise
Command(cwd).execute("""echo '' > %s""" % added_file)
Command(cwd).execute("""hg init %s""" % path)
Command(cwd).execute("""hg add %s""" % added_file)
for i in xrange(2):
cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
Command(cwd).execute(cmd)
cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
Command(cwd).execute(cmd)
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
{'user':USER,
'pass':PASS,
'host':HOST,
'cloned_repo':HG_REPO + '_error',
'dest':jn(TESTS_TMP_PATH, HG_REPO)}
stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
if not """abort: HTTP Error 403: Forbidden""" in stderr:
raise Exception('Failure')
@test_wrapp
def get_logs():
sa = get_session()
return len(sa.query(UserLog).all())
@test_wrapp
def test_logs(initial):
sa = get_session()
logs = sa.query(UserLog).all()
operations = 7
if initial + operations != len(logs):
raise Exception("missing number of logs %s vs %s" % (initial, len(logs)))
if __name__ == '__main__':
create_test_user(force=False)
create_test_repo()
initial_logs = get_logs()
# test_push_modify_file()
test_clone_with_credentials()
test_clone_wrong_credentials()
test_push_new_file(commits=2, with_clone=True)
test_clone_anonymous()
test_push_wrong_path()
test_push_wrong_credentials()
test_logs(initial_logs)