from rhodecode.tests import * from rhodecode.model.db import Repository from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel class TestForksController(TestController): def setUp(self): self.username = u'forkuser' self.password = u'qweqwe' self.u1 = UserModel().create_or_update( username=self.username, password=self.password, email=u'fork_king@rhodecode.org', name=u'u1', lastname=u'u1' ) self.Session.commit() def tearDown(self): self.Session.delete(self.u1) self.Session.commit() def test_index(self): self.log_user() repo_name = HG_REPO response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) self.assertTrue("""There are no forks yet""" in response.body) def test_index_with_fork(self): self.log_user() # create a fork fork_name = HG_FORK description = 'fork of vcs test' repo_name = HG_REPO org_repo = Repository.get_by_repo_name(repo_name) response = self.app.post(url(controller='forks', action='fork_create', repo_name=repo_name), {'repo_name':fork_name, 'repo_group':'', 'fork_parent_id':org_repo.repo_id, 'repo_type':'hg', 'description':description, 'private':'False'}) response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) self.assertTrue("""""" """vcs_test_hg_fork""" % fork_name in response.body) #remove this fork response = self.app.delete(url('repo', repo_name=fork_name)) def test_z_fork_create(self): self.log_user() fork_name = HG_FORK description = 'fork of vcs test' repo_name = HG_REPO org_repo = Repository.get_by_repo_name(repo_name) response = self.app.post(url(controller='forks', action='fork_create', repo_name=repo_name), {'repo_name':fork_name, 'repo_group':'', 'fork_parent_id':org_repo.repo_id, 'repo_type':'hg', 'description':description, 'private':'False'}) #test if we have a message that fork is ok self.assertTrue('forked %s repository as %s' \ % (repo_name, fork_name) in response.session['flash'][0]) #test if the fork was created in the database fork_repo = self.Session.query(Repository)\ .filter(Repository.repo_name == fork_name).one() self.assertEqual(fork_repo.repo_name, fork_name) self.assertEqual(fork_repo.fork.repo_name, repo_name) #test if fork is visible in the list ? response = response.follow() # check if fork is marked as fork # wait for cache to expire import time time.sleep(10) response = self.app.get(url(controller='summary', action='index', repo_name=fork_name)) self.assertTrue('Fork of %s' % repo_name in response.body) def test_zz_fork_permission_page(self): usr = self.log_user(self.username, self.password)['user_id'] repo_name = HG_REPO forks = self.Session.query(Repository)\ .filter(Repository.fork_id != None)\ .all() self.assertEqual(1, len(forks)) # set read permissions for this RepoModel().grant_user_permission(repo=forks[0], user=usr, perm='repository.read') self.Session.commit() response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) response.mustcontain('
fork of vcs test
') def test_zzz_fork_permission_page(self): usr = self.log_user(self.username, self.password)['user_id'] repo_name = HG_REPO forks = self.Session.query(Repository)\ .filter(Repository.fork_id != None)\ .all() self.assertEqual(1, len(forks)) # set none RepoModel().grant_user_permission(repo=forks[0], user=usr, perm='repository.none') self.Session.commit() # fork shouldn't be there response = self.app.get(url(controller='forks', action='forks', repo_name=repo_name)) response.mustcontain('There are no forks yet')