##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5459:7f730862 default
r5496:cab50adf default
Show More
test_vcs_operations_hg.py
226 lines | 9.0 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Test suite for making push/pull operations, on specially modified INI files
.. important::
You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
to redirect things to stderr instead of stdout.
"""
import time
import pytest
from rhodecode.lib import rc_cache
from rhodecode.model.db import Repository, UserIpMap, CacheKey
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.tests import (GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN)
from rhodecode.tests.vcs_operations import (
Command, _check_proper_clone, _add_files_and_push, HG_REPO_WITH_GROUP)
@pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
class TestVCSOperations(object):
def test_clone_hg_repo_by_admin(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
_check_proper_clone(stdout, stderr, 'hg')
def test_clone_hg_repo_by_admin_pull_protocol(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone --pull', clone_url, tmpdir.strpath)
_check_proper_clone(stdout, stderr, 'hg')
def test_clone_hg_repo_by_admin_pull_stream_protocol(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone --pull --stream', clone_url, tmpdir.strpath)
assert 'files to transfer,' in stdout
assert 'transferred 1.' in stdout
assert '114 files updated,' in stdout
def test_clone_hg_repo_by_id_by_admin(self, rc_web_server, tmpdir):
repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
clone_url = rc_web_server.repo_clone_url('_%s' % repo_id)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
_check_proper_clone(stdout, stderr, 'hg')
def test_clone_hg_repo_with_group_by_admin(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO_WITH_GROUP)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
_check_proper_clone(stdout, stderr, 'hg')
def test_clone_wrong_credentials_hg(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'abort: authorization failed' in stderr
def test_clone_git_dir_as_hg(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(GIT_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_non_existing_path_hg(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url('trololo')
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_hg_with_slashes(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url('//' + HG_REPO)
stdout, stderr = Command('/tmp').execute('hg clone', clone_url, tmpdir.strpath)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_existing_path_hg_not_in_database(
self, rc_web_server, tmpdir, fs_repo_only):
db_name = fs_repo_only('not-in-db-hg', repo_type='hg')
clone_url = rc_web_server.repo_clone_url(db_name)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_existing_path_hg_not_in_database_different_scm(
self, rc_web_server, tmpdir, fs_repo_only):
db_name = fs_repo_only('not-in-db-git', repo_type='git')
clone_url = rc_web_server.repo_clone_url(db_name)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'HTTP Error 404: Not Found' in stderr
def test_clone_non_existing_store_path_hg(self, rc_web_server, tmpdir, user_util):
repo = user_util.create_repo()
clone_url = rc_web_server.repo_clone_url(repo.repo_name)
# Damage repo by removing it's folder
RepoModel()._delete_filesystem_repo(repo)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'HTTP Error 404: Not Found' in stderr
def test_push_new_file_hg(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
stdout, stderr = _add_files_and_push(
'hg', tmpdir.strpath, clone_url=clone_url)
assert 'pushing to' in stdout
assert 'size summary' in stdout
def test_push_invalidates_cache(self, rc_web_server, tmpdir):
hg_repo = Repository.get_by_repo_name(HG_REPO)
# init cache objects
CacheKey.delete_all_cache()
repo_namespace_key = CacheKey.REPO_INVALIDATION_NAMESPACE.format(repo_id=hg_repo.repo_id)
inv_context_manager = rc_cache.InvalidationContext(key=repo_namespace_key)
with inv_context_manager as invalidation_context:
# __enter__ will create and register cache objects
pass
cache_keys = hg_repo.cache_keys
assert cache_keys != []
old_ids = [x.cache_state_uid for x in cache_keys]
# clone to init cache
clone_url = rc_web_server.repo_clone_url(hg_repo.repo_name)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
cache_keys = hg_repo.cache_keys
assert cache_keys != []
for key in cache_keys:
assert key.cache_active is True
# PUSH that should trigger invalidation cache
stdout, stderr = _add_files_and_push(
'hg', tmpdir.strpath, clone_url=clone_url, files_no=1)
# flush...
Session().commit()
hg_repo = Repository.get_by_repo_name(HG_REPO)
cache_keys = hg_repo.cache_keys
assert cache_keys != []
new_ids = [x.cache_state_uid for x in cache_keys]
assert new_ids != old_ids
def test_push_wrong_credentials_hg(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
push_url = rc_web_server.repo_clone_url(
HG_REPO, user='bad', passwd='name')
stdout, stderr = _add_files_and_push(
'hg', tmpdir.strpath, clone_url=push_url)
assert 'abort: authorization failed' in stderr
def test_push_back_to_wrong_url_hg(self, rc_web_server, tmpdir):
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
stdout, stderr = _add_files_and_push(
'hg', tmpdir.strpath,
clone_url=rc_web_server.repo_clone_url('not-existing'))
assert 'HTTP Error 404: Not Found' in stderr
def test_ip_restriction_hg(self, rc_web_server, tmpdir):
user_model = UserModel()
try:
user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
Session().commit()
time.sleep(2)
clone_url = rc_web_server.repo_clone_url(HG_REPO)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
assert 'abort: HTTP Error 403: Forbidden' in stderr
finally:
# release IP restrictions
for ip in UserIpMap.getAll():
UserIpMap.delete(ip.ip_id)
Session().commit()
time.sleep(2)
stdout, stderr = Command('/tmp').execute(
'hg clone', clone_url, tmpdir.strpath)
_check_proper_clone(stdout, stderr, 'hg')