test_admin_repos.py
639 lines
| 28.0 KiB
| text/x-python
|
PythonLexer
r1398 | # -*- coding: utf-8 -*- | |||
r1037 | import os | |||
Bradley M. Kuhn
|
r4116 | import mock | ||
r3550 | import urllib | |||
r2007 | from rhodecode.lib import vcs | |||
r3629 | from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\ | |||
Permission | ||||
Bradley M. Kuhn
|
r4116 | from rhodecode.model.user import UserModel | ||
r691 | from rhodecode.tests import * | |||
Bradley M. Kuhn
|
r4116 | from rhodecode.model.repo_group import RepoGroupModel | ||
r2529 | from rhodecode.model.repo import RepoModel | |||
Bradley M. Kuhn
|
r4116 | from rhodecode.model.scm import ScmModel | ||
r3629 | from rhodecode.model.meta import Session | |||
Bradley M. Kuhn
|
r4116 | from rhodecode.tests.fixture import Fixture, error_function | ||
r3647 | ||||
fixture = Fixture() | ||||
r3629 | ||||
def _get_permission_for_user(user, repo): | ||||
perm = UserRepoToPerm.query()\ | ||||
.filter(UserRepoToPerm.repository == | ||||
Repository.get_by_repo_name(repo))\ | ||||
.filter(UserRepoToPerm.user == User.get_by_username(user))\ | ||||
.all() | ||||
return perm | ||||
r691 | ||||
r2459 | ||||
Bradley M. Kuhn
|
r4116 | class _BaseTest(TestController): | ||
""" | ||||
Write all tests here | ||||
""" | ||||
REPO = None | ||||
REPO_TYPE = None | ||||
NEW_REPO = None | ||||
OTHER_TYPE_REPO = None | ||||
OTHER_TYPE = None | ||||
@classmethod | ||||
def setup_class(cls): | ||||
pass | ||||
@classmethod | ||||
def teardown_class(cls): | ||||
pass | ||||
r691 | ||||
def test_index(self): | ||||
self.log_user() | ||||
response = self.app.get(url('repos')) | ||||
Bradley M. Kuhn
|
r4116 | def test_create(self): | ||
r691 | self.log_user() | |||
Bradley M. Kuhn
|
r4116 | repo_name = self.NEW_REPO | ||
r691 | description = 'description for newly created repo' | |||
r3057 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
Bradley M. Kuhn
|
r4116 | repo_type=self.REPO_TYPE, | ||
r3056 | repo_description=description)) | |||
Bradley M. Kuhn
|
r4116 | ## run the check page that triggers the flash message | ||
response = self.app.get(url('repo_check_home', repo_name=repo_name)) | ||||
self.assertEqual(response.json, {u'result': True}) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (repo_name, repo_name)) | |||
r691 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repo was created in the database | ||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).one() | |||
r691 | ||||
r1366 | self.assertEqual(new_repo.repo_name, repo_name) | |||
self.assertEqual(new_repo.description, description) | ||||
r691 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository is visible in the list ? | ||
response = self.app.get(url('summary_home', repo_name=repo_name)) | ||||
response.mustcontain(repo_name) | ||||
response.mustcontain(self.REPO_TYPE) | ||||
r691 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository was created on filesystem | ||
r1037 | try: | |||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r1037 | ||||
Bradley M. Kuhn
|
r4116 | RepoModel().delete(repo_name) | ||
Session().commit() | ||||
def test_create_non_ascii(self): | ||||
r1398 | self.log_user() | |||
non_ascii = "ąęł" | ||||
Bradley M. Kuhn
|
r4116 | repo_name = "%s%s" % (self.NEW_REPO, non_ascii) | ||
r1398 | repo_name_unicode = repo_name.decode('utf8') | |||
description = 'description for newly created repo' + non_ascii | ||||
description_unicode = description.decode('utf8') | ||||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
Bradley M. Kuhn
|
r4116 | repo_type=self.REPO_TYPE, | ||
r3056 | repo_description=description)) | |||
Bradley M. Kuhn
|
r4116 | ## run the check page that triggers the flash message | ||
response = self.app.get(url('repo_check_home', repo_name=repo_name)) | ||||
self.assertEqual(response.json, {u'result': True}) | ||||
r1398 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | u'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (urllib.quote(repo_name), repo_name_unicode)) | |||
Bradley M. Kuhn
|
r4116 | # test if the repo was created in the database | ||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name_unicode).one() | |||
r1398 | ||||
self.assertEqual(new_repo.repo_name, repo_name_unicode) | ||||
self.assertEqual(new_repo.description, description_unicode) | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository is visible in the list ? | ||
response = self.app.get(url('summary_home', repo_name=repo_name)) | ||||
response.mustcontain(repo_name) | ||||
response.mustcontain(self.REPO_TYPE) | ||||
r1398 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository was created on filesystem | ||
r1398 | try: | |||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r1398 | ||||
Bradley M. Kuhn
|
r4116 | def test_create_in_group(self): | ||
r2529 | self.log_user() | |||
## create GROUP | ||||
Bradley M. Kuhn
|
r4116 | group_name = 'sometest_%s' % self.REPO_TYPE | ||
gr = RepoGroupModel().create(group_name=group_name, | ||||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
r3797 | Session().commit() | |||
r2529 | ||||
repo_name = 'ingroup' | ||||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
description = 'description for newly created repo' | ||||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
Bradley M. Kuhn
|
r4116 | repo_type=self.REPO_TYPE, | ||
repo_description=description, | ||||
repo_group=gr.group_id,)) | ||||
## run the check page that triggers the flash message | ||||
response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) | ||||
self.assertEqual(response.json, {u'result': True}) | ||||
self.checkSessionFlash(response, | ||||
'Created repository <a href="/%s">%s</a>' | ||||
% (repo_name_full, repo_name_full)) | ||||
# test if the repo was created in the database | ||||
new_repo = Session().query(Repository)\ | ||||
.filter(Repository.repo_name == repo_name_full).one() | ||||
new_repo_id = new_repo.repo_id | ||||
self.assertEqual(new_repo.repo_name, repo_name_full) | ||||
self.assertEqual(new_repo.description, description) | ||||
# test if the repository is visible in the list ? | ||||
response = self.app.get(url('summary_home', repo_name=repo_name_full)) | ||||
response.mustcontain(repo_name_full) | ||||
response.mustcontain(self.REPO_TYPE) | ||||
inherited_perms = UserRepoToPerm.query()\ | ||||
.filter(UserRepoToPerm.repository_id == new_repo_id).all() | ||||
self.assertEqual(len(inherited_perms), 1) | ||||
# test if the repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) | ||||
except Exception: | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
self.fail('no repo %s in filesystem' % repo_name) | ||||
RepoModel().delete(repo_name_full) | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
def test_create_in_group_without_needed_permissions(self): | ||||
usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) | ||||
# revoke | ||||
user_model = UserModel() | ||||
# disable fork and create on default user | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') | ||||
# disable on regular user | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') | ||||
Session().commit() | ||||
## create GROUP | ||||
group_name = 'reg_sometest_%s' % self.REPO_TYPE | ||||
gr = RepoGroupModel().create(group_name=group_name, | ||||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
Session().commit() | ||||
group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE | ||||
gr_allowed = RepoGroupModel().create(group_name=group_name_allowed, | ||||
group_description='test', | ||||
owner=TEST_USER_REGULAR_LOGIN) | ||||
Session().commit() | ||||
repo_name = 'ingroup' | ||||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=self.REPO_TYPE, | ||||
r3056 | repo_description=description, | |||
repo_group=gr.group_id,)) | ||||
Bradley M. Kuhn
|
r4116 | response.mustcontain('Invalid value') | ||
# user is allowed to create in this group | ||||
repo_name = 'ingroup' | ||||
repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name]) | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=self.REPO_TYPE, | ||||
repo_description=description, | ||||
repo_group=gr_allowed.group_id,)) | ||||
## run the check page that triggers the flash message | ||||
response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) | ||||
self.assertEqual(response.json, {u'result': True}) | ||||
r2529 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
Bradley M. Kuhn
|
r4116 | % (repo_name_full, repo_name_full)) | ||
# test if the repo was created in the database | ||||
r3797 | new_repo = Session().query(Repository)\ | |||
r2529 | .filter(Repository.repo_name == repo_name_full).one() | |||
Bradley M. Kuhn
|
r4116 | new_repo_id = new_repo.repo_id | ||
r2529 | ||||
self.assertEqual(new_repo.repo_name, repo_name_full) | ||||
self.assertEqual(new_repo.description, description) | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository is visible in the list ? | ||
response = self.app.get(url('summary_home', repo_name=repo_name_full)) | ||||
response.mustcontain(repo_name_full) | ||||
response.mustcontain(self.REPO_TYPE) | ||||
r2529 | ||||
Bradley M. Kuhn
|
r4116 | inherited_perms = UserRepoToPerm.query()\ | ||
.filter(UserRepoToPerm.repository_id == new_repo_id).all() | ||||
self.assertEqual(len(inherited_perms), 1) | ||||
r2529 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository was created on filesystem | ||
r2529 | try: | |||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) | ||||
r3631 | except Exception: | |||
Bradley M. Kuhn
|
r4116 | RepoGroupModel().delete(group_name) | ||
r3797 | Session().commit() | |||
r2529 | self.fail('no repo %s in filesystem' % repo_name) | |||
RepoModel().delete(repo_name_full) | ||||
Bradley M. Kuhn
|
r4116 | RepoGroupModel().delete(group_name) | ||
RepoGroupModel().delete(group_name_allowed) | ||||
Session().commit() | ||||
def test_create_in_group_inherit_permissions(self): | ||||
self.log_user() | ||||
## create GROUP | ||||
group_name = 'sometest_%s' % self.REPO_TYPE | ||||
gr = RepoGroupModel().create(group_name=group_name, | ||||
group_description='test', | ||||
owner=TEST_USER_ADMIN_LOGIN) | ||||
perm = Permission.get_by_key('repository.write') | ||||
RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm) | ||||
## add repo permissions | ||||
r3797 | Session().commit() | |||
r1037 | ||||
Bradley M. Kuhn
|
r4116 | repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE | ||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
r691 | description = 'description for newly created repo' | |||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
Bradley M. Kuhn
|
r4116 | repo_name=repo_name, | ||
repo_type=self.REPO_TYPE, | ||||
repo_description=description, | ||||
repo_group=gr.group_id, | ||||
repo_copy_permissions=True)) | ||||
## run the check page that triggers the flash message | ||||
response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) | ||||
self.checkSessionFlash(response, | ||||
'Created repository <a href="/%s">%s</a>' | ||||
% (repo_name_full, repo_name_full)) | ||||
# test if the repo was created in the database | ||||
new_repo = Session().query(Repository)\ | ||||
.filter(Repository.repo_name == repo_name_full).one() | ||||
new_repo_id = new_repo.repo_id | ||||
self.assertEqual(new_repo.repo_name, repo_name_full) | ||||
self.assertEqual(new_repo.description, description) | ||||
# test if the repository is visible in the list ? | ||||
response = self.app.get(url('summary_home', repo_name=repo_name_full)) | ||||
response.mustcontain(repo_name_full) | ||||
response.mustcontain(self.REPO_TYPE) | ||||
# test if the repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) | ||||
except Exception: | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
self.fail('no repo %s in filesystem' % repo_name) | ||||
#check if inherited permissiona are applied | ||||
inherited_perms = UserRepoToPerm.query()\ | ||||
.filter(UserRepoToPerm.repository_id == new_repo_id).all() | ||||
self.assertEqual(len(inherited_perms), 2) | ||||
self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username | ||||
for x in inherited_perms]) | ||||
self.assertTrue('repository.write' in [x.permission.permission_name | ||||
for x in inherited_perms]) | ||||
RepoModel().delete(repo_name_full) | ||||
RepoGroupModel().delete(group_name) | ||||
Session().commit() | ||||
def test_create_remote_repo_wrong_clone_uri(self): | ||||
self.log_user() | ||||
repo_name = self.NEW_REPO | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=self.REPO_TYPE, | ||||
repo_description=description, | ||||
clone_uri='http://127.0.0.1/repo')) | ||||
response.mustcontain('invalid clone url') | ||||
def test_create_remote_repo_wrong_clone_uri_hg_svn(self): | ||||
self.log_user() | ||||
repo_name = self.NEW_REPO | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=self.REPO_TYPE, | ||||
repo_description=description, | ||||
clone_uri='svn+http://127.0.0.1/repo')) | ||||
response.mustcontain('invalid clone url') | ||||
def test_delete(self): | ||||
self.log_user() | ||||
repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_type=self.REPO_TYPE, | ||||
r3056 | repo_name=repo_name, | |||
repo_description=description)) | ||||
Bradley M. Kuhn
|
r4116 | ## run the check page that triggers the flash message | ||
response = self.app.get(url('repo_check_home', repo_name=repo_name)) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (repo_name, repo_name)) | |||
Bradley M. Kuhn
|
r4116 | # test if the repo was created in the database | ||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).one() | |||
r691 | ||||
r2459 | self.assertEqual(new_repo.repo_name, repo_name) | |||
self.assertEqual(new_repo.description, description) | ||||
r691 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository is visible in the list ? | ||
response = self.app.get(url('summary_home', repo_name=repo_name)) | ||||
r2459 | response.mustcontain(repo_name) | |||
Bradley M. Kuhn
|
r4116 | response.mustcontain(self.REPO_TYPE) | ||
r691 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository was created on filesystem | ||
r2459 | try: | |||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r691 | ||||
response = self.app.delete(url('repo', repo_name=repo_name)) | ||||
r3640 | self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name)) | |||
r691 | ||||
response.follow() | ||||
#check if repo was deleted from db | ||||
r3797 | deleted_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).scalar() | |||
r1366 | ||||
self.assertEqual(deleted_repo, None) | ||||
r691 | ||||
r2459 | self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), | |||
False) | ||||
Bradley M. Kuhn
|
r4116 | def test_delete_non_ascii(self): | ||
r2459 | self.log_user() | |||
Bradley M. Kuhn
|
r4116 | non_ascii = "ąęł" | ||
repo_name = "%s%s" % (self.NEW_REPO, non_ascii) | ||||
repo_name_unicode = repo_name.decode('utf8') | ||||
description = 'description for newly created repo' + non_ascii | ||||
description_unicode = description.decode('utf8') | ||||
r3057 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
Bradley M. Kuhn
|
r4116 | repo_type=self.REPO_TYPE, | ||
r3056 | repo_description=description)) | |||
Bradley M. Kuhn
|
r4116 | ## run the check page that triggers the flash message | ||
response = self.app.get(url('repo_check_home', repo_name=repo_name)) | ||||
self.assertEqual(response.json, {u'result': True}) | ||||
r2459 | self.checkSessionFlash(response, | |||
Bradley M. Kuhn
|
r4116 | u'Created repository <a href="/%s">%s</a>' | ||
% (urllib.quote(repo_name), repo_name_unicode)) | ||||
# test if the repo was created in the database | ||||
r3797 | new_repo = Session().query(Repository)\ | |||
Bradley M. Kuhn
|
r4116 | .filter(Repository.repo_name == repo_name_unicode).one() | ||
r2459 | ||||
Bradley M. Kuhn
|
r4116 | self.assertEqual(new_repo.repo_name, repo_name_unicode) | ||
self.assertEqual(new_repo.description, description_unicode) | ||||
r2459 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository is visible in the list ? | ||
response = self.app.get(url('summary_home', repo_name=repo_name)) | ||||
response.mustcontain(repo_name) | ||||
response.mustcontain(self.REPO_TYPE) | ||||
r2459 | ||||
Bradley M. Kuhn
|
r4116 | # test if the repository was created on filesystem | ||
r2459 | try: | |||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
response = self.app.delete(url('repo', repo_name=repo_name)) | ||||
Bradley M. Kuhn
|
r4116 | self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode)) | ||
r2459 | response.follow() | |||
#check if repo was deleted from db | ||||
r3797 | deleted_repo = Session().query(Repository)\ | |||
Bradley M. Kuhn
|
r4116 | .filter(Repository.repo_name == repo_name_unicode).scalar() | ||
r2459 | ||||
self.assertEqual(deleted_repo, None) | ||||
self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), | ||||
False) | ||||
r1366 | ||||
def test_delete_repo_with_group(self): | ||||
#TODO: | ||||
pass | ||||
r691 | ||||
def test_delete_browser_fakeout(self): | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.post(url('repo', repo_name=self.REPO), | ||
r1366 | params=dict(_method='delete')) | |||
r691 | ||||
Bradley M. Kuhn
|
r4116 | def test_show(self): | ||
r691 | self.log_user() | |||
Bradley M. Kuhn
|
r4116 | response = self.app.get(url('repo', repo_name=self.REPO)) | ||
r691 | ||||
def test_edit(self): | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.get(url('edit_repo', repo_name=self.REPO)) | ||
r3629 | ||||
def test_set_private_flag_sets_default_to_none(self): | ||||
self.log_user() | ||||
#initially repository perm should be read | ||||
Bradley M. Kuhn
|
r4116 | perm = _get_permission_for_user(user='default', repo=self.REPO) | ||
r3629 | self.assertTrue(len(perm), 1) | |||
self.assertEqual(perm[0].permission.permission_name, 'repository.read') | ||||
Bradley M. Kuhn
|
r4116 | self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False) | ||
r3629 | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.put(url('repo', repo_name=self.REPO), | ||
r3647 | fixture._get_repo_create_params(repo_private=1, | |||
Bradley M. Kuhn
|
r4116 | repo_name=self.REPO, | ||
repo_type=self.REPO_TYPE, | ||||
r3629 | user=TEST_USER_ADMIN_LOGIN)) | |||
self.checkSessionFlash(response, | ||||
Bradley M. Kuhn
|
r4116 | msg='Repository %s updated successfully' % (self.REPO)) | ||
self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True) | ||||
r3629 | ||||
#now the repo default permission should be None | ||||
Bradley M. Kuhn
|
r4116 | perm = _get_permission_for_user(user='default', repo=self.REPO) | ||
r3629 | self.assertTrue(len(perm), 1) | |||
self.assertEqual(perm[0].permission.permission_name, 'repository.none') | ||||
Bradley M. Kuhn
|
r4116 | response = self.app.put(url('repo', repo_name=self.REPO), | ||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
Bradley M. Kuhn
|
r4116 | repo_name=self.REPO, | ||
repo_type=self.REPO_TYPE, | ||||
r3629 | user=TEST_USER_ADMIN_LOGIN)) | |||
self.checkSessionFlash(response, | ||||
Bradley M. Kuhn
|
r4116 | msg='Repository %s updated successfully' % (self.REPO)) | ||
self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False) | ||||
r3629 | ||||
#we turn off private now the repo default permission should stay None | ||||
Bradley M. Kuhn
|
r4116 | perm = _get_permission_for_user(user='default', repo=self.REPO) | ||
r3629 | self.assertTrue(len(perm), 1) | |||
self.assertEqual(perm[0].permission.permission_name, 'repository.none') | ||||
#update this permission back | ||||
perm[0].permission = Permission.get_by_key('repository.read') | ||||
Session().add(perm[0]) | ||||
Session().commit() | ||||
Bradley M. Kuhn
|
r4116 | |||
def test_set_repo_fork_has_no_self_id(self): | ||||
self.log_user() | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO)) | ||||
opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id | ||||
response.mustcontain(no=[opt]) | ||||
def test_set_fork_of_other_repo(self): | ||||
self.log_user() | ||||
other_repo = 'other_%s' % self.REPO_TYPE | ||||
fixture.create_repo(other_repo, repo_type=self.REPO_TYPE) | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
repo2 = Repository.get_by_repo_name(other_repo) | ||||
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), | ||||
params=dict(id_fork_of=repo2.repo_id)) | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
repo2 = Repository.get_by_repo_name(other_repo) | ||||
self.checkSessionFlash(response, | ||||
'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name)) | ||||
assert repo.fork == repo2 | ||||
response = response.follow() | ||||
# check if given repo is selected | ||||
opt = """<option value="%s" selected="selected">%s</option>""" % ( | ||||
repo2.repo_id, repo2.repo_name) | ||||
response.mustcontain(opt) | ||||
fixture.destroy_repo(other_repo, forks='detach') | ||||
def test_set_fork_of_other_type_repo(self): | ||||
self.log_user() | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) | ||||
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), | ||||
params=dict(id_fork_of=repo2.repo_id)) | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) | ||||
self.checkSessionFlash(response, | ||||
'Cannot set repository as fork of repository with other type') | ||||
def test_set_fork_of_none(self): | ||||
self.log_user() | ||||
## mark it as None | ||||
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), | ||||
params=dict(id_fork_of=None)) | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) | ||||
self.checkSessionFlash(response, | ||||
'Marked repo %s as fork of %s' | ||||
% (repo.repo_name, "Nothing")) | ||||
assert repo.fork is None | ||||
def test_set_fork_of_same_repo(self): | ||||
self.log_user() | ||||
repo = Repository.get_by_repo_name(self.REPO) | ||||
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), | ||||
params=dict(id_fork_of=repo.repo_id)) | ||||
self.checkSessionFlash(response, | ||||
'An error occurred during this operation') | ||||
def test_create_on_top_level_without_permissions(self): | ||||
usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) | ||||
# revoke | ||||
user_model = UserModel() | ||||
# disable fork and create on default user | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') | ||||
user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') | ||||
user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') | ||||
# disable on regular user | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') | ||||
user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') | ||||
user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') | ||||
Session().commit() | ||||
user = User.get(usr['user_id']) | ||||
repo_name = self.NEW_REPO+'no_perms' | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=self.REPO_TYPE, | ||||
repo_description=description)) | ||||
response.mustcontain('no permission to create repository in root location') | ||||
RepoModel().delete(repo_name) | ||||
Session().commit() | ||||
@mock.patch.object(RepoModel, '_create_filesystem_repo', error_function) | ||||
def test_create_repo_when_filesystem_op_fails(self): | ||||
self.log_user() | ||||
repo_name = self.NEW_REPO | ||||
description = 'description for newly created repo' | ||||
response = self.app.post(url('repos'), | ||||
fixture._get_repo_create_params(repo_private=False, | ||||
repo_name=repo_name, | ||||
repo_type=self.REPO_TYPE, | ||||
repo_description=description)) | ||||
self.checkSessionFlash(response, | ||||
'Error creating repository %s' % repo_name) | ||||
# repo must not be in db | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
self.assertEqual(repo, None) | ||||
# repo must not be in filesystem ! | ||||
self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name))) | ||||
class TestAdminReposControllerGIT(_BaseTest): | ||||
REPO = GIT_REPO | ||||
REPO_TYPE = 'git' | ||||
NEW_REPO = NEW_GIT_REPO | ||||
OTHER_TYPE_REPO = HG_REPO | ||||
OTHER_TYPE = 'hg' | ||||
class TestAdminReposControllerHG(_BaseTest): | ||||
REPO = HG_REPO | ||||
REPO_TYPE = 'hg' | ||||
NEW_REPO = NEW_HG_REPO | ||||
OTHER_TYPE_REPO = GIT_REPO | ||||
OTHER_TYPE = 'git' | ||||