# HG changeset patch # User Marcin Kuzminski # Date 2011-10-07 21:17:45 # Node ID 0b268dd369ecbe88eb4a2cfefbd1aa4b6d5b59a5 # Parent 1d7a621d396f0641e7124d80876335e07a988bd1 Fixed test_hg_operations test and added concurency test diff --git a/rhodecode/lib/middleware/simplehg.py b/rhodecode/lib/middleware/simplehg.py --- a/rhodecode/lib/middleware/simplehg.py +++ b/rhodecode/lib/middleware/simplehg.py @@ -76,7 +76,7 @@ class SimpleHg(object): # skip passing error to error controller environ['pylons.status_code_redirect'] = True - + #====================================================================== # EXTRACT REPOSITORY NAME FROM ENV #====================================================================== @@ -90,12 +90,13 @@ class SimpleHg(object): # GET ACTION PULL or PUSH #====================================================================== action = self.__get_action(environ) - + #====================================================================== # CHECK ANONYMOUS PERMISSION #====================================================================== if action in ['pull', 'push']: anonymous_user = self.__get_user('default') + username = anonymous_user.username anonymous_perm = self.__check_permission(action, anonymous_user, @@ -152,13 +153,13 @@ class SimpleHg(object): #====================================================================== # MERCURIAL REQUEST HANDLING #====================================================================== - + repo_path = safe_str(os.path.join(self.basepath, repo_name)) log.debug('Repository path is %s' % repo_path) - + baseui = make_ui('db') self.__inject_extras(repo_path, baseui, extras) - + # quick check if that dir exists... if is_valid_repo(repo_name, self.basepath) is False: @@ -257,7 +258,7 @@ class SimpleHg(object): push requests""" invalidate_cache('get_repo_cached_%s' % repo_name) - def __inject_extras(self,repo_path, baseui, extras={}): + def __inject_extras(self, repo_path, baseui, extras={}): """ Injects some extra params into baseui instance diff --git a/rhodecode/tests/test_concurency.py b/rhodecode/tests/test_concurency.py new file mode 100755 --- /dev/null +++ b/rhodecode/tests/test_concurency.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_hg_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test suite for making push/pull operations + + :created_on: Dec 30, 2010 + :copyright: (c) 2010 by marcink. + :license: LICENSE_NAME, see LICENSE_FILE for more details. +""" + +import os +import sys +import shutil +import logging +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from paste.deploy import appconfig +from pylons import config +from sqlalchemy import engine_from_config + +from rhodecode.lib.utils import add_cache +from rhodecode.model import init_model +from rhodecode.model import meta +from rhodecode.model.db import User, Repository +from rhodecode.lib.auth import get_crypt_password + +from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO +from rhodecode.config.environment import load_environment + +rel_path = dn(dn(dn(os.path.abspath(__file__)))) +conf = appconfig('config:development.ini', relative_to=rel_path) +load_environment(conf.global_conf, conf.local_conf) + +add_cache(conf) + +USER = 'test_admin' +PASS = 'test12' +HOST = '127.0.0.1:5000' +DEBUG = True +log = logging.getLogger(__name__) + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + log.debug('Executing %s' % command) + if DEBUG: + print command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + +def get_session(): + engine = engine_from_config(conf, 'sqlalchemy.db1.') + init_model(engine) + sa = meta.Session() + return sa + + +def create_test_user(force=True): + print 'creating test user' + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + + if force and user is not None: + print 'removing current user' + for repo in sa.query(Repository).filter(Repository.user == user).all(): + sa.delete(repo) + sa.delete(user) + sa.commit() + + if user is None or force: + print 'creating new one' + new_usr = User() + new_usr.username = USER + new_usr.password = get_crypt_password(PASS) + new_usr.email = 'mail@mail.com' + new_usr.name = 'test' + new_usr.lastname = 'lasttestname' + new_usr.active = True + new_usr.admin = True + sa.add(new_usr) + sa.commit() + + print 'done' + + +def create_test_repo(force=True): + print 'creating test repo' + from rhodecode.model.repo import RepoModel + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + if user is None: + raise Exception('user not found') + + + repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() + + if repo is None: + print 'repo not found creating' + + form_data = {'repo_name':HG_REPO, + 'repo_type':'hg', + 'private':False, + 'clone_uri':'' } + rm = RepoModel(sa) + rm.base_path = '/home/hg' + rm.create(form_data, user) + + print 'done' + +def set_anonymous_access(enable=True): + sa = get_session() + user = sa.query(User).filter(User.username == 'default').one() + user.active = enable + sa.add(user) + sa.commit() + +def get_anonymous_access(): + sa = get_session() + return sa.query(User).filter(User.username == 'default').one().active + + +#============================================================================== +# TESTS +#============================================================================== +def test_clone_with_credentials(no_errors=False,repo=HG_REPO): + cwd = path = jn(TESTS_TMP_PATH, repo) + + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':repo, + 'dest':path+_RandomNameSequence().next()} + + stdout, stderr = Command(cwd).execute('hg clone', clone_url) + + if no_errors is False: + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' + +if __name__ == '__main__': + try: + create_test_user(force=False) + + for i in range(int(sys.argv[2])): + test_clone_with_credentials(repo=sys.argv[1]) + + except Exception,e: + sys.exit('stop on %s' % e) diff --git a/rhodecode/tests/test_hg_operations.py b/rhodecode/tests/test_hg_operations.py --- a/rhodecode/tests/test_hg_operations.py +++ b/rhodecode/tests/test_hg_operations.py @@ -29,7 +29,7 @@ from sqlalchemy import engine_from_confi from rhodecode.lib.utils import add_cache from rhodecode.model import init_model from rhodecode.model import meta -from rhodecode.model.db import User, Repository +from rhodecode.model.db import User, Repository, UserLog from rhodecode.lib.auth import get_crypt_password from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO @@ -45,7 +45,7 @@ USER = 'test_admin' PASS = 'test12' HOST = '127.0.0.1:5000' DEBUG = bool(int(sys.argv[1])) -print 'DEBUG:',DEBUG +print 'DEBUG:', DEBUG log = logging.getLogger(__name__) @@ -70,15 +70,17 @@ class Command(object): def test_wrapp(func): - - def __wrapp(*args,**kwargs): - print '###%s###' %func.__name__ + + def __wrapp(*args, **kwargs): + print '###%s###' % func.__name__ try: - res = func(*args,**kwargs) - except: - print '--%s failed--' % func.__name__ - return - print 'ok' + res = func(*args, **kwargs) + except Exception, e: + print ('###############\n-' + '--%s failed %s--\n' + '###############\n' % (func.__name__, e)) + sys.exit() + print '++OK++' return res return __wrapp @@ -90,20 +92,20 @@ def get_session(): def create_test_user(force=True): - print 'creating test user' + print '\tcreating test user' sa = get_session() user = sa.query(User).filter(User.username == USER).scalar() if force and user is not None: - print 'removing current user' + print '\tremoving current user' for repo in sa.query(Repository).filter(Repository.user == user).all(): sa.delete(repo) sa.delete(user) sa.commit() if user is None or force: - print 'creating new one' + print '\tcreating new one' new_usr = User() new_usr.username = USER new_usr.password = get_crypt_password(PASS) @@ -115,7 +117,7 @@ def create_test_user(force=True): sa.add(new_usr) sa.commit() - print 'done' + print '\tdone' def create_test_repo(force=True): @@ -130,7 +132,7 @@ def create_test_repo(force=True): repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() if repo is None: - print 'repo not found creating' + print '\trepo not found creating' form_data = {'repo_name':HG_REPO, 'repo_type':'hg', @@ -144,12 +146,13 @@ def create_test_repo(force=True): def set_anonymous_access(enable=True): sa = get_session() user = sa.query(User).filter(User.username == 'default').one() + sa.expire(user) user.active = enable sa.add(user) sa.commit() sa.remove() - - print 'anonymous access is now:',enable + import time;time.sleep(3) + print '\tanonymous access is now:', enable def get_anonymous_access(): @@ -173,13 +176,13 @@ def test_clone_with_credentials(no_error except OSError: raise - print 'checking if anonymous access is enabled' + print '\tchecking if anonymous access is enabled' anonymous_access = get_anonymous_access() if anonymous_access: - print 'enabled, disabling it ' + print '\tenabled, disabling it ' set_anonymous_access(enable=False) time.sleep(1) - + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER, 'pass':PASS, @@ -206,13 +209,13 @@ def test_clone_anonymous(): raise - print 'checking if anonymous access is enabled' + print '\tchecking if anonymous access is enabled' anonymous_access = get_anonymous_access() if not anonymous_access: - print 'not enabled, enabling it ' + print '\tnot enabled, enabling it ' set_anonymous_access(enable=True) time.sleep(1) - + clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER, 'pass':PASS, @@ -227,7 +230,7 @@ def test_clone_anonymous(): #disable if it was enabled if not anonymous_access: - print 'disabling anonymous access' + print '\tdisabling anonymous access' set_anonymous_access(enable=False) @test_wrapp @@ -241,12 +244,12 @@ def test_clone_wrong_credentials(): except OSError: raise - print 'checking if anonymous access is enabled' + print '\tchecking if anonymous access is enabled' anonymous_access = get_anonymous_access() if anonymous_access: - print 'enabled, disabling it ' + print '\tenabled, disabling it ' set_anonymous_access(enable=False) - + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER + 'error', 'pass':PASS, @@ -257,7 +260,7 @@ def test_clone_wrong_credentials(): stdout, stderr = Command(cwd).execute('hg clone', clone_url) if not """abort: authorization failed""" in stderr: - raise Exception('Failure') + raise Exception('Failure') @test_wrapp def test_pull(): @@ -335,7 +338,7 @@ def test_push_wrong_path(): try: shutil.rmtree(path, ignore_errors=True) os.makedirs(path) - print 'made dirs %s' % jn(path) + print '\tmade dirs %s' % jn(path) except OSError: raise @@ -361,19 +364,37 @@ def test_push_wrong_path(): if not """abort: HTTP Error 403: Forbidden""" in stderr: raise Exception('Failure') +@test_wrapp +def get_logs(): + sa = get_session() + return len(sa.query(UserLog).all()) + +@test_wrapp +def test_logs(initial): + sa = get_session() + logs = sa.query(UserLog).all() + operations = 7 + if initial + operations != len(logs): + raise Exception("missing number of logs %s vs %s" % (initial, len(logs))) + if __name__ == '__main__': create_test_user(force=False) create_test_repo() - + + initial_logs = get_logs() + # test_push_modify_file() test_clone_with_credentials() test_clone_wrong_credentials() test_push_new_file(commits=2, with_clone=True) -# + + test_clone_anonymous() test_push_wrong_path() - - test_clone_anonymous() - test_push_wrong_credentials() \ No newline at end of file + + + test_push_wrong_credentials() + + test_logs(initial_logs)