# -*- coding: utf-8 -*- # Copyright (C) 2010-2017 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import pytest from rhodecode.tests import * from rhodecode.tests.fixture import Fixture from rhodecode.lib import helpers as h from rhodecode.model.db import Repository from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel from rhodecode.model.meta import Session fixture = Fixture() def route_path(name, params=None, **kwargs): import urllib base_url = { 'repo_summary': '/{repo_name}', 'repo_creating_check': '/{repo_name}/repo_creating_check', }[name].format(**kwargs) if params: base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) return base_url class _BaseTest(TestController): REPO = None REPO_TYPE = None NEW_REPO = None REPO_FORK = None @pytest.fixture(autouse=True) def prepare(self, request, pylonsapp): self.username = u'forkuser' self.password = u'qweqwe' self.u1 = fixture.create_user(self.username, password=self.password, email=u'fork_king@rhodecode.org') Session().commit() self.u1id = self.u1.user_id request.addfinalizer(self.cleanup) def cleanup(self): u1 = UserModel().get(self.u1id) Session().delete(u1) Session().commit() def test_index(self): self.log_user() repo_name = self.REPO response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) response.mustcontain("""There are no forks yet""") def test_no_permissions_to_fork(self): usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)['user_id'] user_model = UserModel() user_model.revoke_perm(usr, 'hg.fork.repository') user_model.grant_perm(usr, 'hg.fork.none') u = UserModel().get(usr) u.inherit_default_permissions = False Session().commit() # try create a fork repo_name = self.REPO self.app.post( url(controller='forks', action='fork_create', repo_name=repo_name), {'csrf_token': self.csrf_token}, status=404) def test_index_with_fork(self): self.log_user() # create a fork fork_name = self.REPO_FORK description = 'fork of vcs test' repo_name = self.REPO source_repo = Repository.get_by_repo_name(repo_name) creation_args = { 'repo_name': fork_name, 'repo_group': '', 'fork_parent_id': source_repo.repo_id, 'repo_type': self.REPO_TYPE, 'description': description, 'private': 'False', 'landing_rev': 'rev:tip', 'csrf_token': self.csrf_token, } self.app.post(url(controller='forks', action='fork_create', repo_name=repo_name), creation_args) response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) response.mustcontain( """%s""" % (fork_name, fork_name) ) # remove this fork fixture.destroy_repo(fork_name) def test_fork_create_into_group(self): self.log_user() group = fixture.create_repo_group('vc') group_id = group.group_id fork_name = self.REPO_FORK fork_name_full = 'vc/%s' % fork_name description = 'fork of vcs test' repo_name = self.REPO source_repo = Repository.get_by_repo_name(repo_name) creation_args = { 'repo_name': fork_name, 'repo_group': group_id, 'fork_parent_id': source_repo.repo_id, 'repo_type': self.REPO_TYPE, 'description': description, 'private': 'False', 'landing_rev': 'rev:tip', 'csrf_token': self.csrf_token, } self.app.post(url(controller='forks', action='fork_create', repo_name=repo_name), creation_args) repo = Repository.get_by_repo_name(fork_name_full) assert repo.fork.repo_name == self.REPO # run the check page that triggers the flash message response = self.app.get( route_path('repo_creating_check', repo_name=fork_name_full)) # test if we have a message that fork is ok assert_session_flash(response, 'Forked repository %s as %s' % (repo_name, fork_name_full, fork_name_full)) # test if the fork was created in the database fork_repo = Session().query(Repository)\ .filter(Repository.repo_name == fork_name_full).one() assert fork_repo.repo_name == fork_name_full assert fork_repo.fork.repo_name == repo_name # test if the repository is visible in the list ? response = self.app.get(h.route_path('repo_summary', repo_name=fork_name_full)) response.mustcontain(fork_name_full) response.mustcontain(self.REPO_TYPE) response.mustcontain('Fork of') response.mustcontain('%s' % (repo_name, repo_name)) fixture.destroy_repo(fork_name_full) fixture.destroy_repo_group(group_id) def test_z_fork_create(self): self.log_user() fork_name = self.REPO_FORK description = 'fork of vcs test' repo_name = self.REPO source_repo = Repository.get_by_repo_name(repo_name) creation_args = { 'repo_name': fork_name, 'repo_group': '', 'fork_parent_id': source_repo.repo_id, 'repo_type': self.REPO_TYPE, 'description': description, 'private': 'False', 'landing_rev': 'rev:tip', 'csrf_token': self.csrf_token, } self.app.post(url(controller='forks', action='fork_create', repo_name=repo_name), creation_args) repo = Repository.get_by_repo_name(self.REPO_FORK) assert repo.fork.repo_name == self.REPO # run the check page that triggers the flash message response = self.app.get( route_path('repo_creating_check', repo_name=fork_name)) # test if we have a message that fork is ok assert_session_flash(response, 'Forked repository %s as %s' % (repo_name, fork_name, fork_name)) # test if the fork was created in the database fork_repo = Session().query(Repository)\ .filter(Repository.repo_name == fork_name).one() assert fork_repo.repo_name == fork_name assert fork_repo.fork.repo_name == repo_name # test if the repository is visible in the list ? response = self.app.get(h.route_path('repo_summary', repo_name=fork_name)) response.mustcontain(fork_name) response.mustcontain(self.REPO_TYPE) response.mustcontain('Fork of') response.mustcontain('%s' % (repo_name, repo_name)) def test_zz_fork_permission_page(self): usr = self.log_user(self.username, self.password)['user_id'] repo_name = self.REPO forks = Repository.query()\ .filter(Repository.repo_type == self.REPO_TYPE)\ .filter(Repository.fork_id != None).all() assert 1 == len(forks) # set read permissions for this RepoModel().grant_user_permission(repo=forks[0], user=usr, perm='repository.read') Session().commit() response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) response.mustcontain('fork of vcs test') def test_zzz_fork_permission_page(self): usr = self.log_user(self.username, self.password)['user_id'] repo_name = self.REPO forks = Repository.query()\ .filter(Repository.repo_type == self.REPO_TYPE)\ .filter(Repository.fork_id != None).all() assert 1 == len(forks) # set none RepoModel().grant_user_permission(repo=forks[0], user=usr, perm='repository.none') Session().commit() # fork shouldn't be there response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) response.mustcontain('There are no forks yet') class TestGIT(_BaseTest): REPO = GIT_REPO NEW_REPO = NEW_GIT_REPO REPO_TYPE = 'git' REPO_FORK = GIT_FORK class TestHG(_BaseTest): REPO = HG_REPO NEW_REPO = NEW_HG_REPO REPO_TYPE = 'hg' REPO_FORK = HG_FORK @pytest.mark.usefixtures('app', 'autologin_user') @pytest.mark.skip_backends('git','hg') class TestSVNFork(object): def test_fork_redirects(self, backend): denied_actions = ['fork','fork_create'] for action in denied_actions: response = self.app.get(url( controller='forks', action=action, repo_name=backend.repo_name)) assert response.status_int == 302 # Not allowed, redirect to the summary redirected = response.follow() summary_url = h.route_path('repo_summary', repo_name=backend.repo_name) # URL adds leading slash and path doesn't have it assert redirected.request.path == summary_url