##// END OF EJS Templates
use str() on os.walk passing unicode can lead to UnicodeDecode errors when iterating
use str() on os.walk passing unicode can lead to UnicodeDecode errors when iterating

File last commit:

r2895:f5dc0417 beta
r2963:742d1b8c beta
Show More
test_concurency.py
221 lines | 6.4 KiB | text/x-python | PythonLexer
orginized test module...
r2527 # -*- coding: utf-8 -*-
"""
rhodecode.tests.test_hg_operations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test suite for making push/pull operations
:created_on: Dec 30, 2010
:author: marcink
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import shutil
import logging
from os.path import join as jn
from os.path import dirname as dn
from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE
from paste.deploy import appconfig
from pylons import config
from sqlalchemy import engine_from_config
from rhodecode.lib.utils import add_cache
from rhodecode.model import init_model
from rhodecode.model import meta
from rhodecode.model.db import User, Repository
from rhodecode.lib.auth import get_crypt_password
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
from rhodecode.config.environment import load_environment
made concurrency test also for git
r2895 rel_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
conf = appconfig('config:rc.ini', relative_to=rel_path)
orginized test module...
r2527 load_environment(conf.global_conf, conf.local_conf)
add_cache(conf)
USER = 'test_admin'
PASS = 'test12'
made concurrency test also for git
r2895 HOST = 'rc.local'
orginized test module...
r2527 METHOD = 'pull'
DEBUG = True
log = logging.getLogger(__name__)
class Command(object):
def __init__(self, cwd):
self.cwd = cwd
def execute(self, cmd, *args):
"""Runs command on the system with given ``args``.
"""
command = cmd + ' ' + ' '.join(args)
log.debug('Executing %s' % command)
if DEBUG:
print command
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
stdout, stderr = p.communicate()
if DEBUG:
print stdout, stderr
return stdout, stderr
def get_session():
engine = engine_from_config(conf, 'sqlalchemy.db1.')
init_model(engine)
sa = meta.Session
return sa
def create_test_user(force=True):
print 'creating test user'
sa = get_session()
user = sa.query(User).filter(User.username == USER).scalar()
if force and user is not None:
print 'removing current user'
for repo in sa.query(Repository).filter(Repository.user == user).all():
sa.delete(repo)
sa.delete(user)
sa.commit()
if user is None or force:
print 'creating new one'
new_usr = User()
new_usr.username = USER
new_usr.password = get_crypt_password(PASS)
new_usr.email = 'mail@mail.com'
new_usr.name = 'test'
new_usr.lastname = 'lasttestname'
new_usr.active = True
new_usr.admin = True
sa.add(new_usr)
sa.commit()
print 'done'
def create_test_repo(force=True):
print 'creating test repo'
from rhodecode.model.repo import RepoModel
sa = get_session()
user = sa.query(User).filter(User.username == USER).scalar()
if user is None:
raise Exception('user not found')
repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
if repo is None:
print 'repo not found creating'
made concurrency test also for git
r2895 form_data = {'repo_name': HG_REPO,
'repo_type': 'hg',
orginized test module...
r2527 'private':False,
made concurrency test also for git
r2895 'clone_uri': '' }
orginized test module...
r2527 rm = RepoModel(sa)
rm.base_path = '/home/hg'
rm.create(form_data, user)
print 'done'
def set_anonymous_access(enable=True):
sa = get_session()
user = sa.query(User).filter(User.username == 'default').one()
user.active = enable
sa.add(user)
sa.commit()
def get_anonymous_access():
sa = get_session()
return sa.query(User).filter(User.username == 'default').one().active
#==============================================================================
# TESTS
#==============================================================================
def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
made concurrency test also for git
r2895 seq=None, backend='hg'):
orginized test module...
r2527 cwd = path = jn(TESTS_TMP_PATH, repo)
if seq == None:
seq = _RandomNameSequence().next()
try:
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)
#print 'made dirs %s' % jn(path)
except OSError:
raise
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
made concurrency test also for git
r2895 {'user': USER,
'pass': PASS,
'host': HOST,
'cloned_repo': repo, }
orginized test module...
r2527
dest = path + seq
if method == 'pull':
made concurrency test also for git
r2895 stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url)
orginized test module...
r2527 else:
made concurrency test also for git
r2895 stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest)
print stdout,'sdasdsadsa'
orginized test module...
r2527 if no_errors is False:
made concurrency test also for git
r2895 if backend == 'hg':
assert """adding file changes""" in stdout, 'no messages about cloning'
assert """abort""" not in stderr , 'got error from clone'
elif backend == 'git':
assert """Cloning into""" in stdout, 'no messages about cloning'
orginized test module...
r2527
if __name__ == '__main__':
try:
create_test_user(force=False)
seq = None
import time
try:
METHOD = sys.argv[3]
except:
pass
made concurrency test also for git
r2895 try:
backend = sys.argv[4]
except:
backend = 'hg'
orginized test module...
r2527 if METHOD == 'pull':
seq = _RandomNameSequence().next()
test_clone_with_credentials(repo=sys.argv[1], method='clone',
made concurrency test also for git
r2895 seq=seq, backend=backend)
orginized test module...
r2527 s = time.time()
for i in range(1, int(sys.argv[2]) + 1):
print 'take', i
test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
made concurrency test also for git
r2895 seq=seq, backend=backend)
orginized test module...
r2527 print 'time taken %.3f' % (time.time() - s)
except Exception, e:
raise
sys.exit('stop on %s' % e)