##// END OF EJS Templates
Wrapped calls for git and hg middleware in extra block that clears db Session....
Wrapped calls for git and hg middleware in extra block that clears db Session. tries to fix #318

File last commit:

r1758:a87aa385 beta
r1761:b265be1c beta
Show More
test_models.py
410 lines | 15.0 KiB | text/x-python | PythonLexer
implements #226 repo groups available by path...
r1538 import os
Fixes issue #201...
r1373 import unittest
from rhodecode.tests import *
implements #226 repo groups available by path...
r1538
from rhodecode.model.repos_group import ReposGroupModel
fixes #260 Put repo in group, then move group to another group -> repo becomes unavailable
r1539 from rhodecode.model.repo import RepoModel
Tests updates, Session refactoring
r1713 from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
fixed repo_create permission by adding missing commit statements...
r1758 UsersGroup, UsersGroupMember, Permission
implements #226 repo groups available by path...
r1538 from sqlalchemy.exc import IntegrityError
#302 - basic notification system, models+tests
r1702 from rhodecode.model.user import UserModel
Tests updates, Session refactoring
r1713 from rhodecode.model.meta import Session
from rhodecode.model.notification import NotificationModel
from rhodecode.model.users_group import UsersGroupModel
implements #226 repo groups available by path...
r1538
class TestReposGroups(unittest.TestCase):
def setUp(self):
self.g1 = self.__make_group('test1', skip_if_exists=True)
self.g2 = self.__make_group('test2', skip_if_exists=True)
self.g3 = self.__make_group('test3', skip_if_exists=True)
def tearDown(self):
print 'out'
def __check_path(self, *path):
path = [TESTS_TMP_PATH] + list(path)
path = os.path.join(*path)
return os.path.isdir(path)
def _check_folders(self):
print os.listdir(TESTS_TMP_PATH)
def __make_group(self, path, desc='desc', parent_id=None,
skip_if_exists=False):
refactoring of models names for repoGroup permissions
r1633 gr = RepoGroup.get_by_group_name(path)
implements #226 repo groups available by path...
r1538 if gr and skip_if_exists:
return gr
form_data = dict(group_name=path,
group_description=desc,
group_parent_id=parent_id)
gr = ReposGroupModel().create(form_data)
another major refactoring with session management
r1734 Session.commit()
implements #226 repo groups available by path...
r1538 return gr
def __delete_group(self, id_):
ReposGroupModel().delete(id_)
def __update_group(self, id_, path, desc='desc', parent_id=None):
form_data = dict(group_name=path,
group_description=desc,
group_parent_id=parent_id)
gr = ReposGroupModel().update(id_, form_data)
return gr
def test_create_group(self):
g = self.__make_group('newGroup')
self.assertEqual(g.full_path, 'newGroup')
self.assertTrue(self.__check_path('newGroup'))
def test_create_same_name_group(self):
self.assertRaises(IntegrityError, lambda:self.__make_group('newGroup'))
commit less models...
r1749 Session.rollback()
implements #226 repo groups available by path...
r1538
def test_same_subgroup(self):
sg1 = self.__make_group('sub1', parent_id=self.g1.group_id)
self.assertEqual(sg1.parent_group, self.g1)
self.assertEqual(sg1.full_path, 'test1/sub1')
self.assertTrue(self.__check_path('test1', 'sub1'))
ssg1 = self.__make_group('subsub1', parent_id=sg1.group_id)
self.assertEqual(ssg1.parent_group, sg1)
self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
def test_remove_group(self):
sg1 = self.__make_group('deleteme')
self.__delete_group(sg1.group_id)
refactoring of models names for repoGroup permissions
r1633 self.assertEqual(RepoGroup.get(sg1.group_id), None)
implements #226 repo groups available by path...
r1538 self.assertFalse(self.__check_path('deteteme'))
sg1 = self.__make_group('deleteme', parent_id=self.g1.group_id)
self.__delete_group(sg1.group_id)
refactoring of models names for repoGroup permissions
r1633 self.assertEqual(RepoGroup.get(sg1.group_id), None)
implements #226 repo groups available by path...
r1538 self.assertFalse(self.__check_path('test1', 'deteteme'))
def test_rename_single_group(self):
sg1 = self.__make_group('initial')
new_sg1 = self.__update_group(sg1.group_id, 'after')
self.assertTrue(self.__check_path('after'))
refactoring of models names for repoGroup permissions
r1633 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
implements #226 repo groups available by path...
r1538
def test_update_group_parent(self):
sg1 = self.__make_group('initial', parent_id=self.g1.group_id)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
self.assertTrue(self.__check_path('test1', 'after'))
refactoring of models names for repoGroup permissions
r1633 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
implements #226 repo groups available by path...
r1538
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
self.assertTrue(self.__check_path('test3', 'after'))
refactoring of models names for repoGroup permissions
r1633 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
implements #226 repo groups available by path...
r1538
new_sg1 = self.__update_group(sg1.group_id, 'hello')
self.assertTrue(self.__check_path('hello'))
refactoring of models names for repoGroup permissions
r1633 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
implements #226 repo groups available by path...
r1538
fixes #260 Put repo in group, then move group to another group -> repo becomes unavailable
r1539
def test_subgrouping_with_repo(self):
g1 = self.__make_group('g1')
g2 = self.__make_group('g2')
# create new repo
form_data = dict(repo_name='john',
repo_name_full='john',
fork_name=None,
description=None,
repo_group=None,
private=False,
repo_type='hg',
clone_uri=None)
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
self.assertEqual(r.repo_name, 'john')
# put repo into group
form_data = form_data
form_data['repo_group'] = g1.group_id
form_data['perms_new'] = []
form_data['perms_updates'] = []
RepoModel().update(r.repo_name, form_data)
self.assertEqual(r.repo_name, 'g1/john')
self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
self.assertTrue(self.__check_path('g2', 'g1'))
# test repo
self.assertEqual(r.repo_name, os.path.join('g2', 'g1', r.just_name))
Tests updates, Session refactoring
r1713 class TestUser(unittest.TestCase):
fixed repo_create permission by adding missing commit statements...
r1758 def __init__(self, methodName='runTest'):
Session.remove()
super(TestUser, self).__init__(methodName=methodName)
Tests updates, Session refactoring
r1713 def test_create_and_remove(self):
usr = UserModel().create_or_update(username=u'test_user', password=u'qweqwe',
email=u'u232@rhodecode.org',
name=u'u1', lastname=u'u1')
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713 self.assertEqual(User.get_by_username(u'test_user'), usr)
# make users group
another major refactoring with session management
r1734 users_group = UsersGroupModel().create('some_example_group')
commit less models...
r1749 Session.commit()
another major refactoring with session management
r1734
Tests updates, Session refactoring
r1713 UsersGroupModel().add_user_to_group(users_group, usr)
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713
self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
self.assertEqual(UsersGroupMember.query().count(), 1)
UserModel().delete(usr.user_id)
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713
self.assertEqual(UsersGroupMember.query().all(), [])
fixes #260 Put repo in group, then move group to another group -> repo becomes unavailable
r1539
#302 - basic notification system, models+tests
r1702 class TestNotifications(unittest.TestCase):
Tests updates, Session refactoring
r1713 def __init__(self, methodName='runTest'):
fixed repo_create permission by adding missing commit statements...
r1758 Session.remove()
Tests updates, Session refactoring
r1713 self.u1 = UserModel().create_or_update(username=u'u1',
password=u'qweqwe',
email=u'u1@rhodecode.org',
another major refactoring with session management
r1734 name=u'u1', lastname=u'u1')
Session.commit()
self.u1 = self.u1.user_id
Tests updates, Session refactoring
r1713 self.u2 = UserModel().create_or_update(username=u'u2',
password=u'qweqwe',
email=u'u2@rhodecode.org',
another major refactoring with session management
r1734 name=u'u2', lastname=u'u3')
Session.commit()
self.u2 = self.u2.user_id
Tests updates, Session refactoring
r1713 self.u3 = UserModel().create_or_update(username=u'u3',
password=u'qweqwe',
email=u'u3@rhodecode.org',
another major refactoring with session management
r1734 name=u'u3', lastname=u'u3')
Session.commit()
self.u3 = self.u3.user_id
Tests updates, Session refactoring
r1713 super(TestNotifications, self).__init__(methodName=methodName)
#302 - basic notification system, models+tests
r1702
Tests updates, Session refactoring
r1713 def _clean_notifications(self):
for n in Notification.query().all():
commit less models...
r1749 Session.delete(n)
Tests updates, Session refactoring
r1713
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713 self.assertEqual(Notification.query().all(), [])
#302 - basic notification system, models+tests
r1702
fixed repo_create permission by adding missing commit statements...
r1758 def tearDown(self):
self._clean_notifications()
#302 - basic notification system, models+tests
r1702
def test_create_notification(self):
Tests updates, Session refactoring
r1713 self.assertEqual([], Notification.query().all())
self.assertEqual([], UserNotification.query().all())
#302 - basic notification system, models+tests
r1702 usrs = [self.u1, self.u2]
Tests updates, Session refactoring
r1713 notification = NotificationModel().create(created_by=self.u1,
Notification system improvements...
r1712 subject=u'subj', body=u'hi there',
fixed tests
r1709 recipients=usrs)
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713 u1 = User.get(self.u1)
u2 = User.get(self.u2)
u3 = User.get(self.u3)
Notification system improvements...
r1712 notifications = Notification.query().all()
self.assertEqual(len(notifications), 1)
#302 - basic notification system, models+tests
r1702 unotification = UserNotification.query()\
.filter(UserNotification.notification == notification).all()
Notification system improvements...
r1712
Tests updates, Session refactoring
r1713 self.assertEqual(notifications[0].recipients, [u1, u2])
fixed tests
r1709 self.assertEqual(notification.notification_id,
notifications[0].notification_id)
#302 - basic notification system, models+tests
r1702 self.assertEqual(len(unotification), len(usrs))
Tests updates, Session refactoring
r1713 self.assertEqual([x.user.user_id for x in unotification], usrs)
#302 - basic notification system, models+tests
r1702
def test_user_notifications(self):
Tests updates, Session refactoring
r1713 self.assertEqual([], Notification.query().all())
self.assertEqual([], UserNotification.query().all())
notification1 = NotificationModel().create(created_by=self.u1,
subject=u'subj', body=u'hi there1',
fixed tests
r1709 recipients=[self.u3])
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713 notification2 = NotificationModel().create(created_by=self.u1,
subject=u'subj', body=u'hi there2',
fixed tests
r1709 recipients=[self.u3])
commit less models...
r1749 Session.commit()
u3 = Session.query(User).get(self.u3)
Tests updates, Session refactoring
r1713
self.assertEqual(sorted([x.notification for x in u3.notifications]),
sorted([notification2, notification1]))
#302 - basic notification system, models+tests
r1702
def test_delete_notifications(self):
Tests updates, Session refactoring
r1713 self.assertEqual([], Notification.query().all())
self.assertEqual([], UserNotification.query().all())
notification = NotificationModel().create(created_by=self.u1,
Notification system improvements...
r1712 subject=u'title', body=u'hi there3',
fixed tests
r1709 recipients=[self.u3, self.u1, self.u2])
commit less models...
r1749 Session.commit()
#302 - basic notification system, models+tests
r1702 notifications = Notification.query().all()
self.assertTrue(notification in notifications)
Notification.delete(notification.notification_id)
commit less models...
r1749 Session.commit()
#302 - basic notification system, models+tests
r1702
notifications = Notification.query().all()
self.assertFalse(notification in notifications)
un = UserNotification.query().filter(UserNotification.notification
== notification).all()
self.assertEqual(un, [])
Tests updates, Session refactoring
r1713
fixed bug with eager deletes in notifications...
r1733
Tests updates, Session refactoring
r1713 def test_delete_association(self):
self.assertEqual([], Notification.query().all())
self.assertEqual([], UserNotification.query().all())
notification = NotificationModel().create(created_by=self.u1,
subject=u'title', body=u'hi there3',
recipients=[self.u3, self.u1, self.u2])
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713
unotification = UserNotification.query()\
.filter(UserNotification.notification ==
notification)\
.filter(UserNotification.user_id == self.u3)\
.scalar()
self.assertEqual(unotification.user_id, self.u3)
NotificationModel().delete(self.u3,
notification.notification_id)
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713
fixed bug with eager deletes in notifications...
r1733 u3notification = UserNotification.query()\
Tests updates, Session refactoring
r1713 .filter(UserNotification.notification ==
notification)\
.filter(UserNotification.user_id == self.u3)\
.scalar()
fixed bug with eager deletes in notifications...
r1733 self.assertEqual(u3notification, None)
# notification object is still there
self.assertEqual(Notification.query().all(), [notification])
#u1 and u2 still have assignments
u1notification = UserNotification.query()\
.filter(UserNotification.notification ==
notification)\
.filter(UserNotification.user_id == self.u1)\
.scalar()
self.assertNotEqual(u1notification, None)
u2notification = UserNotification.query()\
.filter(UserNotification.notification ==
notification)\
.filter(UserNotification.user_id == self.u2)\
.scalar()
self.assertNotEqual(u2notification, None)
Tests updates, Session refactoring
r1713 def test_notification_counter(self):
self._clean_notifications()
self.assertEqual([], Notification.query().all())
self.assertEqual([], UserNotification.query().all())
NotificationModel().create(created_by=self.u1,
subject=u'title', body=u'hi there_delete',
recipients=[self.u3, self.u1])
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713
self.assertEqual(NotificationModel()
.get_unread_cnt_for_user(self.u1), 1)
self.assertEqual(NotificationModel()
.get_unread_cnt_for_user(self.u2), 0)
self.assertEqual(NotificationModel()
.get_unread_cnt_for_user(self.u3), 1)
notification = NotificationModel().create(created_by=self.u1,
subject=u'title', body=u'hi there3',
recipients=[self.u3, self.u1, self.u2])
commit less models...
r1749 Session.commit()
Tests updates, Session refactoring
r1713
self.assertEqual(NotificationModel()
.get_unread_cnt_for_user(self.u1), 2)
self.assertEqual(NotificationModel()
.get_unread_cnt_for_user(self.u2), 1)
self.assertEqual(NotificationModel()
.get_unread_cnt_for_user(self.u3), 2)
fixed repo_create permission by adding missing commit statements...
r1758
class TestUsers(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(TestUsers, self).__init__(methodName=methodName)
def setUp(self):
self.u1 = UserModel().create_or_update(username=u'u1',
password=u'qweqwe',
email=u'u1@rhodecode.org',
name=u'u1', lastname=u'u1')
def tearDown(self):
perm = Permission.query().all()
for p in perm:
UserModel().revoke_perm(self.u1, p)
UserModel().delete(self.u1)
Session.commit()
def test_add_perm(self):
perm = Permission.query().all()[0]
UserModel().grant_perm(self.u1, perm)
Session.commit()
self.assertEqual(UserModel().has_perm(self.u1, perm), True)
def test_has_perm(self):
perm = Permission.query().all()
for p in perm:
has_p = UserModel().has_perm(self.u1, p)
self.assertEqual(False, has_p)
def test_revoke_perm(self):
perm = Permission.query().all()[0]
UserModel().grant_perm(self.u1, perm)
Session.commit()
self.assertEqual(UserModel().has_perm(self.u1, perm), True)
#revoke
UserModel().revoke_perm(self.u1, perm)
Session.commit()
self.assertEqual(UserModel().has_perm(self.u1, perm),False)