test_admin_repos.py
371 lines
| 14.6 KiB
| text/x-python
|
PythonLexer
r1398 | # -*- coding: utf-8 -*- | |||
r1037 | import os | |||
r3550 | import urllib | |||
r2007 | from rhodecode.lib import vcs | |||
r3629 | from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\ | |||
Permission | ||||
r691 | from rhodecode.tests import * | |||
r2529 | from rhodecode.model.repos_group import ReposGroupModel | |||
from rhodecode.model.repo import RepoModel | ||||
r3629 | from rhodecode.model.meta import Session | |||
r3647 | from rhodecode.tests.fixture import Fixture | |||
fixture = Fixture() | ||||
r3629 | ||||
def _get_permission_for_user(user, repo): | ||||
perm = UserRepoToPerm.query()\ | ||||
.filter(UserRepoToPerm.repository == | ||||
Repository.get_by_repo_name(repo))\ | ||||
.filter(UserRepoToPerm.user == User.get_by_username(user))\ | ||||
.all() | ||||
return perm | ||||
r691 | ||||
r2459 | ||||
r691 | class TestAdminReposController(TestController): | |||
def test_index(self): | ||||
self.log_user() | ||||
response = self.app.get(url('repos')) | ||||
# Test response... | ||||
def test_index_as_xml(self): | ||||
response = self.app.get(url('formatted_repos', format='xml')) | ||||
def test_create_hg(self): | ||||
self.log_user() | ||||
repo_name = NEW_HG_REPO | ||||
description = 'description for newly created repo' | ||||
r3057 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
repo_description=description)) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (repo_name, repo_name)) | |||
r691 | ||||
r1366 | #test if the repo was created in the database | |||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).one() | |||
r691 | ||||
r1366 | self.assertEqual(new_repo.repo_name, repo_name) | |||
self.assertEqual(new_repo.description, description) | ||||
r691 | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
r2459 | response.mustcontain(repo_name) | |||
r1037 | ||||
#test if repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r1037 | ||||
r1398 | def test_create_hg_non_ascii(self): | |||
self.log_user() | ||||
non_ascii = "ąęł" | ||||
repo_name = "%s%s" % (NEW_HG_REPO, non_ascii) | ||||
repo_name_unicode = repo_name.decode('utf8') | ||||
description = 'description for newly created repo' + non_ascii | ||||
description_unicode = description.decode('utf8') | ||||
private = False | ||||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
repo_description=description)) | ||||
r1398 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | u'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (urllib.quote(repo_name), repo_name_unicode)) | |||
r1398 | #test if the repo was created in the database | |||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name_unicode).one() | |||
r1398 | ||||
self.assertEqual(new_repo.repo_name, repo_name_unicode) | ||||
self.assertEqual(new_repo.description, description_unicode) | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
r2459 | response.mustcontain(repo_name) | |||
r1398 | ||||
#test if repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r1398 | ||||
r3675 | def test_create_hg_in_group(self): | |||
r2529 | self.log_user() | |||
## create GROUP | ||||
group_name = 'sometest' | ||||
gr = ReposGroupModel().create(group_name=group_name, | ||||
r3222 | group_description='test', | |||
owner=TEST_USER_ADMIN_LOGIN) | ||||
r3797 | Session().commit() | |||
r2529 | ||||
repo_name = 'ingroup' | ||||
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) | ||||
description = 'description for newly created repo' | ||||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_name=repo_name, | |||
repo_description=description, | ||||
repo_group=gr.group_id,)) | ||||
r2529 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3673 | % (repo_name_full, repo_name)) | |||
r2529 | #test if the repo was created in the database | |||
r3797 | new_repo = Session().query(Repository)\ | |||
r2529 | .filter(Repository.repo_name == repo_name_full).one() | |||
self.assertEqual(new_repo.repo_name, repo_name_full) | ||||
self.assertEqual(new_repo.description, description) | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
response.mustcontain(repo_name_full) | ||||
#test if repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) | ||||
r3631 | except Exception: | |||
r2529 | ReposGroupModel().delete(group_name) | |||
r3797 | Session().commit() | |||
r2529 | self.fail('no repo %s in filesystem' % repo_name) | |||
RepoModel().delete(repo_name_full) | ||||
ReposGroupModel().delete(group_name) | ||||
r3797 | Session().commit() | |||
r1037 | ||||
r691 | def test_create_git(self): | |||
self.log_user() | ||||
repo_name = NEW_GIT_REPO | ||||
description = 'description for newly created repo' | ||||
r3056 | ||||
response = self.app.post(url('repos'), | ||||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_type='git', | |||
repo_name=repo_name, | ||||
repo_description=description)) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (repo_name, repo_name)) | |||
r691 | ||||
r2459 | #test if the repo was created in the database | |||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).one() | |||
r691 | ||||
r2459 | self.assertEqual(new_repo.repo_name, repo_name) | |||
self.assertEqual(new_repo.description, description) | ||||
r691 | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
r2459 | response.mustcontain(repo_name) | |||
r691 | ||||
r1037 | #test if repository was created on filesystem | |||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
def test_create_git_non_ascii(self): | ||||
self.log_user() | ||||
non_ascii = "ąęł" | ||||
repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii) | ||||
repo_name_unicode = repo_name.decode('utf8') | ||||
description = 'description for newly created repo' + non_ascii | ||||
description_unicode = description.decode('utf8') | ||||
private = False | ||||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_type='git', | |||
repo_name=repo_name, | ||||
repo_description=description)) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | u'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (urllib.quote(repo_name), repo_name_unicode)) | |||
r2459 | ||||
#test if the repo was created in the database | ||||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name_unicode).one() | |||
self.assertEqual(new_repo.repo_name, repo_name_unicode) | ||||
self.assertEqual(new_repo.description, description_unicode) | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
response.mustcontain(repo_name) | ||||
#test if repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r691 | ||||
def test_update(self): | ||||
response = self.app.put(url('repo', repo_name=HG_REPO)) | ||||
def test_update_browser_fakeout(self): | ||||
r1366 | response = self.app.post(url('repo', repo_name=HG_REPO), | |||
params=dict(_method='put')) | ||||
r691 | ||||
r2459 | def test_delete_hg(self): | |||
r691 | self.log_user() | |||
repo_name = 'vcs_test_new_to_delete' | ||||
description = 'description for newly created repo' | ||||
r3056 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_type='hg', | |||
repo_name=repo_name, | ||||
repo_description=description)) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (repo_name, repo_name)) | |||
r691 | #test if the repo was created in the database | |||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).one() | |||
r691 | ||||
r1366 | self.assertEqual(new_repo.repo_name, repo_name) | |||
self.assertEqual(new_repo.description, description) | ||||
r691 | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
r2459 | response.mustcontain(repo_name) | |||
r691 | ||||
r2459 | #test if repository was created on filesystem | |||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
r691 | ||||
response = self.app.delete(url('repo', repo_name=repo_name)) | ||||
r3640 | self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name)) | |||
r691 | ||||
response.follow() | ||||
#check if repo was deleted from db | ||||
r3797 | deleted_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).scalar() | |||
r1366 | ||||
self.assertEqual(deleted_repo, None) | ||||
r691 | ||||
r2459 | self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), | |||
False) | ||||
def test_delete_git(self): | ||||
self.log_user() | ||||
repo_name = 'vcs_test_new_to_delete' | ||||
description = 'description for newly created repo' | ||||
private = False | ||||
r3057 | response = self.app.post(url('repos'), | |||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3056 | repo_type='git', | |||
repo_name=repo_name, | ||||
repo_description=description)) | ||||
r2459 | self.checkSessionFlash(response, | |||
Mads Kiilerich
|
r3565 | 'Created repository <a href="/%s">%s</a>' | ||
r3550 | % (repo_name, repo_name)) | |||
r2459 | #test if the repo was created in the database | |||
r3797 | new_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).one() | |||
self.assertEqual(new_repo.repo_name, repo_name) | ||||
self.assertEqual(new_repo.description, description) | ||||
#test if repository is visible in the list ? | ||||
response = response.follow() | ||||
response.mustcontain(repo_name) | ||||
#test if repository was created on filesystem | ||||
try: | ||||
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) | ||||
r3631 | except Exception: | |||
r2459 | self.fail('no repo %s in filesystem' % repo_name) | |||
response = self.app.delete(url('repo', repo_name=repo_name)) | ||||
r3640 | self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name)) | |||
r2459 | ||||
response.follow() | ||||
#check if repo was deleted from db | ||||
r3797 | deleted_repo = Session().query(Repository)\ | |||
r2459 | .filter(Repository.repo_name == repo_name).scalar() | |||
self.assertEqual(deleted_repo, None) | ||||
self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), | ||||
False) | ||||
r1366 | ||||
def test_delete_repo_with_group(self): | ||||
#TODO: | ||||
pass | ||||
r691 | ||||
def test_delete_browser_fakeout(self): | ||||
r1366 | response = self.app.post(url('repo', repo_name=HG_REPO), | |||
params=dict(_method='delete')) | ||||
r691 | ||||
r2459 | def test_show_hg(self): | |||
r691 | self.log_user() | |||
response = self.app.get(url('repo', repo_name=HG_REPO)) | ||||
r2459 | def test_show_git(self): | |||
self.log_user() | ||||
response = self.app.get(url('repo', repo_name=GIT_REPO)) | ||||
r691 | ||||
def test_edit(self): | ||||
response = self.app.get(url('edit_repo', repo_name=HG_REPO)) | ||||
r3629 | ||||
def test_set_private_flag_sets_default_to_none(self): | ||||
self.log_user() | ||||
#initially repository perm should be read | ||||
perm = _get_permission_for_user(user='default', repo=HG_REPO) | ||||
self.assertTrue(len(perm), 1) | ||||
self.assertEqual(perm[0].permission.permission_name, 'repository.read') | ||||
self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False) | ||||
response = self.app.put(url('repo', repo_name=HG_REPO), | ||||
r3647 | fixture._get_repo_create_params(repo_private=1, | |||
r3629 | repo_name=HG_REPO, | |||
user=TEST_USER_ADMIN_LOGIN)) | ||||
self.checkSessionFlash(response, | ||||
msg='Repository %s updated successfully' % (HG_REPO)) | ||||
self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True) | ||||
#now the repo default permission should be None | ||||
perm = _get_permission_for_user(user='default', repo=HG_REPO) | ||||
self.assertTrue(len(perm), 1) | ||||
self.assertEqual(perm[0].permission.permission_name, 'repository.none') | ||||
response = self.app.put(url('repo', repo_name=HG_REPO), | ||||
r3647 | fixture._get_repo_create_params(repo_private=False, | |||
r3629 | repo_name=HG_REPO, | |||
user=TEST_USER_ADMIN_LOGIN)) | ||||
self.checkSessionFlash(response, | ||||
msg='Repository %s updated successfully' % (HG_REPO)) | ||||
self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False) | ||||
#we turn off private now the repo default permission should stay None | ||||
perm = _get_permission_for_user(user='default', repo=HG_REPO) | ||||
self.assertTrue(len(perm), 1) | ||||
self.assertEqual(perm[0].permission.permission_name, 'repository.none') | ||||
#update this permission back | ||||
perm[0].permission = Permission.get_by_key('repository.read') | ||||
Session().add(perm[0]) | ||||
Session().commit() | ||||