test_hg_operations.py
228 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
r896 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.tests.test_hg_operations | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
Test suite for making push/pull operations | ||||
:created_on: Dec 30, 2010 | ||||
:copyright: (c) 2010 by marcink. | ||||
:license: LICENSE_NAME, see LICENSE_FILE for more details. | ||||
""" | ||||
import os | ||||
import shutil | ||||
import logging | ||||
from subprocess import Popen, PIPE | ||||
from os.path import join as jn | ||||
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO | ||||
USER = 'test_admin' | ||||
PASS = 'test12' | ||||
HOST = '127.0.0.1:5000' | ||||
r910 | DEBUG = True | |||
r896 | log = logging.getLogger(__name__) | |||
r909 | ||||
class Command(object): | ||||
def __init__(self, cwd): | ||||
self.cwd = cwd | ||||
r896 | ||||
r909 | def execute(self, cmd, *args): | |||
"""Runs command on the system with given ``args``. | ||||
""" | ||||
command = cmd + ' ' + ' '.join(args) | ||||
log.debug('Executing %s' % command) | ||||
r910 | if DEBUG: | |||
print command | ||||
r909 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | |||
stdout, stderr = p.communicate() | ||||
r910 | if DEBUG: | |||
print stdout, stderr | ||||
r909 | return stdout, stderr | |||
r896 | ||||
r910 | #============================================================================== | |||
r896 | # TESTS | |||
r910 | #============================================================================== | |||
r896 | def test_clone(): | |||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |||
r896 | try: | |||
r909 | shutil.rmtree(path, ignore_errors=True) | |||
os.makedirs(path) | ||||
#print 'made dirs %s' % jn(path) | ||||
r896 | except OSError: | |||
r909 | raise | |||
r896 | ||||
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | ||||
{'user':USER, | ||||
'pass':PASS, | ||||
'host':HOST, | ||||
'cloned_repo':HG_REPO, | ||||
r909 | 'dest':path} | |||
stdout, stderr = Command(cwd).execute('hg clone', clone_url) | ||||
assert """adding file changes""" in stdout, 'no messages about cloning' | ||||
assert """abort""" not in stderr , 'got error from clone' | ||||
def test_clone_anonymous_ok(): | ||||
cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||||
try: | ||||
shutil.rmtree(path, ignore_errors=True) | ||||
os.makedirs(path) | ||||
#print 'made dirs %s' % jn(path) | ||||
except OSError: | ||||
raise | ||||
clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ | ||||
{'user':USER, | ||||
'pass':PASS, | ||||
'host':HOST, | ||||
'cloned_repo':HG_REPO, | ||||
'dest':path} | ||||
r896 | ||||
r909 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) | |||
print stdout, stderr | ||||
assert """adding file changes""" in stdout, 'no messages about cloning' | ||||
assert """abort""" not in stderr , 'got error from clone' | ||||
def test_clone_wrong_credentials(): | ||||
cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | ||||
try: | ||||
shutil.rmtree(path, ignore_errors=True) | ||||
os.makedirs(path) | ||||
#print 'made dirs %s' % jn(path) | ||||
except OSError: | ||||
raise | ||||
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | ||||
{'user':USER + 'error', | ||||
'pass':PASS, | ||||
'host':HOST, | ||||
'cloned_repo':HG_REPO, | ||||
'dest':path} | ||||
stdout, stderr = Command(cwd).execute('hg clone', clone_url) | ||||
assert """abort: authorization failed""" in stderr , 'no error from wrong credentials' | ||||
r896 | ||||
def test_pull(): | ||||
pass | ||||
def test_push(): | ||||
modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') | ||||
for i in xrange(5): | ||||
cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r896 | ||||
cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r896 | ||||
r909 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) | |||
r896 | ||||
def test_push_new_file(): | ||||
r909 | ||||
test_clone() | ||||
r896 | ||||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |||
added_file = jn(path, 'setup.py') | ||||
r896 | ||||
r909 | Command(cwd).execute('touch %s' % added_file) | |||
Command(cwd).execute('hg add %s' % added_file) | ||||
r896 | ||||
for i in xrange(15): | ||||
cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r896 | ||||
cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r896 | ||||
r909 | push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | |||
{'user':USER, | ||||
'pass':PASS, | ||||
'host':HOST, | ||||
'cloned_repo':HG_REPO, | ||||
'dest':jn(TESTS_TMP_PATH, HG_REPO)} | ||||
Command(cwd).execute('hg push %s' % push_url) | ||||
r896 | ||||
def test_push_wrong_credentials(): | ||||
clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | ||||
{'user':USER + 'xxx', | ||||
'pass':PASS, | ||||
'host':HOST, | ||||
'cloned_repo':HG_REPO, | ||||
'dest':jn(TESTS_TMP_PATH, HG_REPO)} | ||||
modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') | ||||
for i in xrange(5): | ||||
cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r896 | ||||
cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r896 | ||||
r909 | Command(cwd).execute('hg push %s' % clone_url) | |||
r896 | ||||
r897 | def test_push_wrong_path(): | |||
r909 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |||
added_file = jn(path, 'somefile.py') | ||||
r896 | ||||
r897 | try: | |||
r909 | shutil.rmtree(path, ignore_errors=True) | |||
os.makedirs(path) | ||||
print 'made dirs %s' % jn(path) | ||||
r897 | except OSError: | |||
r909 | raise | |||
r897 | ||||
r909 | Command(cwd).execute("""echo '' > %s""" % added_file) | |||
Command(cwd).execute("""hg init %s""" % path) | ||||
Command(cwd).execute("""hg add %s""" % added_file) | ||||
r897 | ||||
for i in xrange(2): | ||||
cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r897 | ||||
cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) | ||||
r909 | Command(cwd).execute(cmd) | |||
r897 | ||||
r909 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | |||
{'user':USER, | ||||
'pass':PASS, | ||||
'host':HOST, | ||||
'cloned_repo':HG_REPO + '_error', | ||||
'dest':jn(TESTS_TMP_PATH, HG_REPO)} | ||||
stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) | ||||
assert """abort: HTTP Error 403: Forbidden""" in stderr | ||||
r896 | ||||
if __name__ == '__main__': | ||||
r897 | test_clone() | |||
r910 | ||||
#test_clone_wrong_credentials() | ||||
r909 | ##test_clone_anonymous_ok() | |||
r910 | test_pull() | |||
r909 | #test_push_new_file() | |||
#test_push_wrong_path() | ||||
#test_push_wrong_credentials() | ||||