diff --git a/rhodecode/tests/test_hg_operations.py b/rhodecode/tests/test_hg_operations.py --- a/rhodecode/tests/test_hg_operations.py +++ b/rhodecode/tests/test_hg_operations.py @@ -25,7 +25,7 @@ from sqlalchemy import engine_from_confi from rhodecode.lib.utils import add_cache from rhodecode.model import init_model from rhodecode.model import meta -from rhodecode.model.db import User +from rhodecode.model.db import User, Repository from rhodecode.lib.auth import get_crypt_password from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO @@ -70,15 +70,18 @@ def get_session(): def create_test_user(force=True): + print 'creating test user' sa = get_session() user = sa.query(User).filter(User.username == USER).scalar() - if force and user: + if force and user is not None: + print 'removing current user' sa.delete(user) sa.commit() if user is None or force: + print 'creating new one' new_usr = User() new_usr.username = USER new_usr.password = get_crypt_password(PASS) @@ -86,17 +89,38 @@ def create_test_user(force=True): new_usr.name = 'test' new_usr.lastname = 'lasttestname' new_usr.active = True - + new_usr.admin = True sa.add(new_usr) sa.commit() + print 'done' + + +def create_test_repo(force=True): + from rhodecode.model.repo import RepoModel + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + if user is None: + raise Exception('user not found') + repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() + + if repo is None: + print 'repo not found creating' + + form_data = {'repo_name':HG_REPO, + 'repo_type':'hg', + 'private':False, } + rm = RepoModel(sa) + rm.base_path = '/home/hg' + rm.create(form_data, user) #============================================================================== # TESTS #============================================================================== -def test_clone(): +def test_clone(no_errors=False): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) try: @@ -116,8 +140,9 @@ def test_clone(): stdout, stderr = Command(cwd).execute('hg clone', clone_url) - assert """adding file changes""" in stdout, 'no messages about cloning' - assert """abort""" not in stderr , 'got error from clone' + if no_errors is False: + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' @@ -170,9 +195,9 @@ def test_clone_wrong_credentials(): def test_pull(): pass -def test_push(): - - modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') +def test_push_modify_file(f_name='setup.py'): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) for i in xrange(5): cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) Command(cwd).execute(cmd) @@ -184,7 +209,7 @@ def test_push(): def test_push_new_file(commits=15): - test_clone() + test_clone(no_errors=True) cwd = path = jn(TESTS_TMP_PATH, HG_REPO) added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next()) @@ -210,7 +235,7 @@ def test_push_new_file(commits=15): Command(cwd).execute('hg push --verbose --debug %s' % push_url) def test_push_wrong_credentials(): - + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ {'user':USER + 'xxx', 'pass':PASS, @@ -262,14 +287,16 @@ def test_push_wrong_path(): if __name__ == '__main__': - #create_test_user() - test_clone() - test_clone_anonymous_ok() + create_test_user(force=False) + create_test_repo() + #test_push_modify_file() + #test_clone() + #test_clone_anonymous_ok() #test_clone_wrong_credentials() #test_pull() - #test_push_new_file(3) + test_push_new_file(commits=3) #test_push_wrong_path() #test_push_wrong_credentials()