# -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Test suite for vcs push/pull operations. The tests need Git > 1.8.1. This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: :created_on: Dec 30, 2010 :author: marcink :copyright: (c) 2013 RhodeCode GmbH, and others. :license: GPLv3, see LICENSE.md for more details. """ import json import os import re import tempfile import time import urllib.request from subprocess import PIPE, Popen from tempfile import _RandomNameSequence import pytest import kallithea from kallithea.lib.utils2 import ascii_bytes, safe_str from kallithea.model import db, meta from kallithea.model.ssh_key import SshKeyModel from kallithea.model.user import UserModel from kallithea.tests import base from kallithea.tests.fixture import Fixture DEBUG = True HOST = '127.0.0.1:4999' # test host fixture = Fixture() # Parameterize different kinds of VCS testing - both the kind of VCS and the # access method (HTTP/SSH) # Mixin for using HTTP and SSH URLs class HttpVcsTest(object): @staticmethod def repo_url_param(webserver, repo_name, **kwargs): return webserver.repo_url(repo_name, **kwargs) class SshVcsTest(object): public_keys = { base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost', base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost', } @classmethod def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR): user = db.User.get_by_username(username) if user.ssh_keys: ssh_key = user.ssh_keys[0] else: sshkeymodel = SshKeyModel() ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username]) meta.Session().commit() return cls._ssh_param(repo_name, user, ssh_key, client_ip) # Mixins for using Mercurial and Git class HgVcsTest(object): repo_type = 'hg' repo_name = base.HG_REPO class GitVcsTest(object): repo_type = 'git' repo_name = base.GIT_REPO # Combine mixins to give the combinations we want to parameterize tests with class HgHttpVcsTest(HgVcsTest, HttpVcsTest): pass class GitHttpVcsTest(GitVcsTest, HttpVcsTest): pass class HgSshVcsTest(HgVcsTest, SshVcsTest): @staticmethod def _ssh_param(repo_name, user, ssh_key, client_ip): # Specify a custom ssh command on the command line return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % ( client_ip, kallithea.CONFIG['__file__'], user.user_id, ssh_key.user_ssh_key_id, repo_name) class GitSshVcsTest(GitVcsTest, SshVcsTest): @staticmethod def _ssh_param(repo_name, user, ssh_key, client_ip): # Set a custom ssh command in the global environment os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % ( client_ip, kallithea.CONFIG['__file__'], user.user_id, ssh_key.user_ssh_key_id) return "ssh://someuser@somehost/%s""" % repo_name parametrize_vcs_test = base.parametrize('vt', [ HgHttpVcsTest, GitHttpVcsTest, HgSshVcsTest, GitSshVcsTest, ]) parametrize_vcs_test_hg = base.parametrize('vt', [ HgHttpVcsTest, HgSshVcsTest, ]) parametrize_vcs_test_http = base.parametrize('vt', [ HgHttpVcsTest, GitHttpVcsTest, ]) class Command(object): def __init__(self, cwd): self.cwd = cwd def execute(self, *args, **environ): """ Runs command on the system with given ``args`` using simple space join without safe quoting. """ command = ' '.join(args) ignoreReturnCode = environ.pop('ignoreReturnCode', False) if DEBUG: print('*** CMD %s ***' % command) testenv = dict(os.environ) testenv['LANG'] = 'en_US.UTF-8' testenv['LANGUAGE'] = 'en_US:en' testenv['HGPLAIN'] = '' testenv['HGRCPATH'] = '' testenv['GIT_COMMITTER_NAME'] = base.TEST_USER_ADMIN_LOGIN testenv['GIT_COMMITTER_EMAIL'] = base.TEST_USER_ADMIN_EMAIL testenv['GIT_AUTHOR_NAME'] = base.TEST_USER_REGULAR_LOGIN testenv['GIT_AUTHOR_EMAIL'] = base.TEST_USER_REGULAR_EMAIL testenv.update(environ) p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv) stdout, stderr = p.communicate() if DEBUG: if stdout: print('stdout:', stdout) if stderr: print('stderr:', stderr) if not ignoreReturnCode: assert p.returncode == 0 return safe_str(stdout), safe_str(stderr) def _get_tmp_dir(prefix='vcs_operations-', suffix=''): return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix) def _add_files(vcs, dest_dir, files_no=3): """ Generate some files, add it to dest_dir repo and push back vcs is git or hg and defines what VCS we want to make those files for :param vcs: :param dest_dir: """ added_file = '%ssetup.py' % next(_RandomNameSequence()) open(os.path.join(dest_dir, added_file), 'a').close() Command(dest_dir).execute(vcs, 'add', added_file) email = 'me@example.com' if os.name == 'nt': author_str = 'User <%s>' % email else: author_str = 'User ǝɯɐᴎ <%s>' % email for i in range(files_no): cmd = """echo "added_line%s" >> %s""" % (i, added_file) Command(dest_dir).execute(cmd) if vcs == 'hg': cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % ( i, author_str, added_file ) elif vcs == 'git': cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % ( i, author_str, added_file ) Command(dest_dir).execute(cmd) def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3): _add_files(vt.repo_type, dest_dir, files_no=files_no) # PUSH it back stdout = stderr = None if vt.repo_type == 'hg': stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode) elif vt.repo_type == 'git': stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode) return stdout, stderr def _check_outgoing(vcs, cwd, clone_url): if vcs == 'hg': # hg removes the password from default URLs, so we have to provide it here via the clone_url return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True) elif vcs == 'git': Command(cwd).execute('git remote update') return Command(cwd).execute('git log origin/master..master') def set_anonymous_access(enable=True): user = db.User.get_default_user() user.active = enable meta.Session().commit() if enable != db.User.get_default_user().active: raise Exception('Cannot set anonymous access') #============================================================================== # TESTS #============================================================================== def _check_proper_git_push(stdout, stderr): assert 'fatal' not in stderr assert 'rejected' not in stderr assert 'Pushing to' in stderr assert 'master -> master' in stderr @pytest.mark.usefixtures("test_context_fixture") class TestVCSOperations(base.TestController): @classmethod def setup_class(cls): # DISABLE ANONYMOUS ACCESS set_anonymous_access(False) @pytest.fixture() def testhook_cleanup(self): yield # remove hook for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']: entry = db.Ui.get_by_key('hooks', '%s.testhook' % hook) if entry: meta.Session().delete(entry) meta.Session().commit() @pytest.fixture(scope="module") def testfork(self): # create fork so the repo stays untouched git_fork_name = '%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence())) fixture.create_fork(base.GIT_REPO, git_fork_name) hg_fork_name = '%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence())) fixture.create_fork(base.HG_REPO, hg_fork_name) return {'git': git_fork_name, 'hg': hg_fork_name} @parametrize_vcs_test def test_clone_repo_by_admin(self, webserver, vt): clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir()) if vt.repo_type == 'git': assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' elif vt.repo_type == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' @parametrize_vcs_test_http def test_clone_wrong_credentials(self, webserver, vt): clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!') stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'fatal: Authentication failed' in stderr elif vt.repo_type == 'hg': assert 'abort: authorization failed' in stderr def test_clone_git_dir_as_hg(self, webserver): clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO) stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr def test_clone_hg_repo_as_git(self, webserver): clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO) stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) assert 'not found' in stderr @parametrize_vcs_test def test_clone_non_existing_path(self, webserver, vt): clone_url = vt.repo_url_param(webserver, 'trololo') stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr elif vt.repo_type == 'hg': assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout + stderr @parametrize_vcs_test def test_push_new_repo(self, webserver, vt): # Clear the log so we know what is added db.UserLog.query().delete() meta.Session().commit() # Create an empty server repo using the API repo_name = 'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence())) usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) params = { "id": 7, "api_key": usr.api_key, "method": 'create_repo', "args": dict(repo_name=repo_name, owner=base.TEST_USER_ADMIN_LOGIN, repo_type=vt.repo_type), } req = urllib.request.Request( 'http://%s:%s/_admin/api' % webserver.server_address, data=ascii_bytes(json.dumps(params)), headers={'content-type': 'application/json'}) response = urllib.request.urlopen(req) result = json.loads(response.read()) # Expect something like: # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None} assert result['result']['success'] # Create local clone of the empty server repo local_clone_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir) # Make 3 commits and push to the empty server repo. # The server repo doesn't have any other heads than the # refs/heads/master we are pushing, but the `git log` in the push hook # should still list the 3 commits. stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) elif vt.repo_type == 'hg': assert 'pushing to ' in stdout assert 'remote: added ' in stdout # Verify that we got the right events in UserLog. Expect something like: # # # # meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([ ('started_following_repo', 0), ('user_created_repo', 0), ('pull', 0), ('push', 3)] if vt.repo_type == 'git' else [ ('started_following_repo', 0), ('user_created_repo', 0), # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones ('push', 3)]) @parametrize_vcs_test def test_push_new_file(self, webserver, testfork, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type]) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) elif vt.repo_type == 'hg': assert 'pushing to' in stdout assert 'Repository size' in stdout assert 'Last revision is now' in stdout meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \ [('pull', 0), ('push', 3)] @parametrize_vcs_test def test_pull(self, webserver, testfork, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url) meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) if vt.repo_type == 'git': assert 'FETCH_HEAD' in stderr elif vt.repo_type == 'hg': assert 'new changesets' in stdout action_parts = [ul.action for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert action_parts == ['pull'] # Test handling of URLs with extra '/' around repo_name stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True) if issubclass(vt, HttpVcsTest): if vt.repo_type == 'git': # NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs assert 'Already up to date.' in stdout else: assert vt.repo_type == 'hg' assert "abort: HTTP Error 404: Not Found" in stderr else: assert issubclass(vt, SshVcsTest) if vt.repo_type == 'git': assert "abort: Access to './%s' denied" % vt.repo_name in stderr else: assert "abort: Access to './%s' denied" % vt.repo_name in stdout + stderr stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'Already up to date.' in stdout else: assert vt.repo_type == 'hg' assert "no changes found" in stdout assert "denied" not in stderr assert "denied" not in stdout assert "404" not in stdout @parametrize_vcs_test def test_push_invalidates_cache(self, webserver, testfork, vt): pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])] dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type]) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url) if vt.repo_type == 'git': _check_proper_git_push(stdout, stderr) meta.Session.close() # expire session to make sure SA fetches new Repository instances after last_changeset has been updated by server side hook in another process post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])] assert pre_cached_tip != post_cached_tip @parametrize_vcs_test_http def test_push_wrong_credentials(self, webserver, vt): dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name') stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url, ignoreReturnCode=True) if vt.repo_type == 'git': assert 'fatal: Authentication failed' in stderr elif vt.repo_type == 'hg': assert 'abort: authorization failed' in stderr @parametrize_vcs_test def test_push_with_readonly_credentials(self, webserver, vt): db.UserLog.query().delete() meta.Session().commit() dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url) if vt.repo_type == 'git': assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr elif vt.repo_type == 'hg': assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...) action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)] assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \ [('pull', 0)] @parametrize_vcs_test def test_push_back_to_wrong_url(self, webserver, vt): dest_dir = _get_tmp_dir() clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push( webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % ( webserver.server_address[0], webserver.server_address[1]), ignoreReturnCode=True) if vt.repo_type == 'git': assert 'not found' in stderr elif vt.repo_type == 'hg': assert 'HTTP Error 404: Not Found' in stderr @parametrize_vcs_test def test_ip_restriction(self, webserver, vt): user_model = UserModel() try: # Add IP constraint that excludes the test context: user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32') meta.Session().commit() # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True) if vt.repo_type == 'git': # The message apparently changed in Git 1.8.3, so match it loosely. assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr elif vt.repo_type == 'hg': assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout + stderr finally: # release IP restrictions for ip in db.UserIpMap.query(): db.UserIpMap.delete(ip.ip_id) meta.Session().commit() # IP permissions are cached, need to wait for the cache in the server process to expire time.sleep(1.5) clone_url = vt.repo_url_param(webserver, vt.repo_name) stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir()) if vt.repo_type == 'git': assert 'Cloning into' in stdout + stderr assert stderr == '' or stdout == '' elif vt.repo_type == 'hg': assert 'requesting all changes' in stdout assert 'adding changesets' in stdout assert 'adding manifests' in stdout assert 'adding file changes' in stdout assert stderr == '' @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt): # set prechangegroup to failing hook (returns True) db.Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook') meta.Session().commit() # clone repo clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS) dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH) \ .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True) if vt.repo_type == 'hg': assert 'preoutgoing.testhook hook failed' in stdout + stderr elif vt.repo_type == 'git': assert 'error: 406' in stderr @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt): # set prechangegroup to failing hook (returns exit code 1) db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook') meta.Session().commit() # clone repo clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS) dest_dir = _get_tmp_dir() stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir) stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=True) assert 'failing_test_hook failed' in stdout + stderr assert 'Traceback' not in stdout + stderr assert 'prechangegroup.testhook hook failed' in stdout + stderr # there are still outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout != '' # set prechangegroup hook to exception throwing method db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook') meta.Session().commit() # re-try to push stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True) if vt is HgHttpVcsTest: # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr elif vt is HgSshVcsTest: assert 'remote: Exception: exception_test_hook threw an exception' in stdout else: assert False # there are still outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout != '' # set prechangegroup hook to method that returns False db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook') meta.Session().commit() # re-try to push stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True) assert 'passing_test_hook succeeded' in stdout + stderr assert 'Traceback' not in stdout + stderr assert 'prechangegroup.testhook hook failed' not in stdout + stderr # no more outgoing changesets stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url) assert stdout == '' assert stderr == '' def test_add_submodule_git(self, webserver, testfork): dest_dir = _get_tmp_dir() clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO) fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git']) # add submodule stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir) stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule') stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"') stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master') # check for testsubmodule link in files page self.log_user() response = self.app.get(base.url(controller='files', action='index', repo_name=testfork['git'], revision='tip', f_path='/')) # check _repo_files_url that will be used to reload as AJAX response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git']) response.mustcontain('testsubmodule @ ' % clone_url) # check that following a submodule link actually works - and redirects response = self.app.get(base.url(controller='files', action='index', repo_name=testfork['git'], revision='tip', f_path='/testsubmodule'), status=302) assert response.location == clone_url