# HG changeset patch # User Marcin Kuzminski # Date 2011-12-06 02:06:01 # Node ID a87aa385f21c198d4d49b9be88563b4e2033abde # Parent 2aa7f454204e8da2bca0e14c3635fbb11e075df8 fixed repo_create permission by adding missing commit statements - added few tests for checking permission in UserModel - added __json__() into get_dict() to fetch from it hybrid_properties and any additional custom properties - code garden diff --git a/rhodecode/controllers/admin/users.py b/rhodecode/controllers/admin/users.py --- a/rhodecode/controllers/admin/users.py +++ b/rhodecode/controllers/admin/users.py @@ -197,7 +197,7 @@ class UsersController(BaseController): user_model.grant_perm(id, perm) h.flash(_("Granted 'repository create' permission to user"), category='success') - + Session.commit() else: perm = Permission.get_by_key('hg.create.repository') user_model.revoke_perm(id, perm) @@ -206,5 +206,5 @@ class UsersController(BaseController): user_model.grant_perm(id, perm) h.flash(_("Revoked 'repository create' permission to user"), category='success') - + Session.commit() return redirect(url('edit_user', id=id)) diff --git a/rhodecode/lib/db_manage.py b/rhodecode/lib/db_manage.py --- a/rhodecode/lib/db_manage.py +++ b/rhodecode/lib/db_manage.py @@ -245,12 +245,20 @@ class DbManage(object): self.create_user(username, password, email, True) else: log.info('creating admin and regular test users') - self.create_user('test_admin', 'test12', - 'test_admin@mail.com', True) - self.create_user('test_regular', 'test12', - 'test_regular@mail.com', False) - self.create_user('test_regular2', 'test12', - 'test_regular2@mail.com', False) + from rhodecode.tests import TEST_USER_ADMIN_LOGIN,\ + TEST_USER_ADMIN_PASS ,TEST_USER_ADMIN_EMAIL,TEST_USER_REGULAR_LOGIN,\ + TEST_USER_REGULAR_PASS,TEST_USER_REGULAR_EMAIL,\ + TEST_USER_REGULAR2_LOGIN,TEST_USER_REGULAR2_PASS,\ + TEST_USER_REGULAR2_EMAIL + + self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, + TEST_USER_ADMIN_EMAIL, True) + + self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, + TEST_USER_REGULAR_EMAIL, False) + + self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, + TEST_USER_REGULAR2_EMAIL, False) def create_ui_settings(self): """Creates ui settings, fills out hooks diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -82,8 +82,8 @@ class ModelSerializer(json.JSONEncoder): return json.JSONEncoder.default(self, obj) class BaseModel(object): - """Base Model for all classess - + """ + Base Model for all classess """ @classmethod @@ -91,13 +91,20 @@ class BaseModel(object): """return column names for this model """ return class_mapper(cls).c.keys() - def get_dict(self): - """return dict with keys and values corresponding - to this model data """ + def get_dict(self, serialized=False): + """ + return dict with keys and values corresponding + to this model data + """ d = {} for k in self._get_keys(): d[k] = getattr(self, k) + + # also use __json__() if present to get additional fields + if hasattr(self, '__json__'): + for k,val in self.__json__().iteritems(): + d[k] = val return d def get_appstruct(self): @@ -350,6 +357,11 @@ class User(Base, BaseModel): log.debug('updated user %s lastlogin', self.username) + def __json__(self): + return dict(email=self.email, + full_name=self.full_name) + + class UserLog(Base, BaseModel): __tablename__ = 'user_logs' __table_args__ = {'extend_existing':True} diff --git a/rhodecode/model/user.py b/rhodecode/model/user.py --- a/rhodecode/model/user.py +++ b/rhodecode/model/user.py @@ -281,9 +281,10 @@ class UserModel(BaseModel): log.error(traceback.format_exc()) raise - def delete(self, user_id): + def delete(self, user): + user = self.__get_user(user) + try: - user = self.get(user_id, cache=False) if user.username == 'default': raise DefaultUserException( _("You can't remove this user since it's" @@ -470,35 +471,37 @@ class UserModel(BaseModel): return user - - def has_perm(self, user, perm): if not isinstance(perm, Permission): - raise Exception('perm needs to be an instance of Permission class') + raise Exception('perm needs to be an instance of Permission class ' + 'got %s instead' % type(perm)) user = self.__get_user(user) - return UserToPerm.query().filter(UserToPerm.user == user.user)\ + return UserToPerm.query().filter(UserToPerm.user == user)\ .filter(UserToPerm.permission == perm).scalar() is not None def grant_perm(self, user, perm): if not isinstance(perm, Permission): - raise Exception('perm needs to be an instance of Permission class') + raise Exception('perm needs to be an instance of Permission class ' + 'got %s instead' % type(perm)) user = self.__get_user(user) new = UserToPerm() - new.user = user.user + new.user = user new.permission = perm self.sa.add(new) def revoke_perm(self, user, perm): if not isinstance(perm, Permission): - raise Exception('perm needs to be an instance of Permission class') + raise Exception('perm needs to be an instance of Permission class ' + 'got %s instead' % type(perm)) user = self.__get_user(user) - obj = UserToPerm.query().filter(UserToPerm.user == user.user)\ - .filter(UserToPerm.permission == perm).one() - self.sa.delete(obj) + obj = UserToPerm.query().filter(UserToPerm.user == user)\ + .filter(UserToPerm.permission == perm).scalar() + if obj: + self.sa.delete(obj) diff --git a/rhodecode/tests/__init__.py b/rhodecode/tests/__init__.py --- a/rhodecode/tests/__init__.py +++ b/rhodecode/tests/__init__.py @@ -31,9 +31,13 @@ time.tzset() log = logging.getLogger(__name__) -__all__ = ['environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO', - 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK', - 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS' ] +__all__ = [ + 'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO', + 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK', + 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS', + 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN', + 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL' +] # Invoke websetup with the current config file # SetupCommand('setup-app').run([config_file]) @@ -48,6 +52,16 @@ environ = {} TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next()) TEST_USER_ADMIN_LOGIN = 'test_admin' TEST_USER_ADMIN_PASS = 'test12' +TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com' + +TEST_USER_REGULAR_LOGIN = 'test_regular' +TEST_USER_REGULAR_PASS = 'test12' +TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com' + +TEST_USER_REGULAR2_LOGIN = 'test_regular2' +TEST_USER_REGULAR2_PASS = 'test12' +TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com' + HG_REPO = 'vcs_test_hg' GIT_REPO = 'vcs_test_git' diff --git a/rhodecode/tests/functional/test_admin_users.py b/rhodecode/tests/functional/test_admin_users.py --- a/rhodecode/tests/functional/test_admin_users.py +++ b/rhodecode/tests/functional/test_admin_users.py @@ -1,11 +1,13 @@ from rhodecode.tests import * -from rhodecode.model.db import User +from rhodecode.model.db import User, Permission from rhodecode.lib.auth import check_password from sqlalchemy.orm.exc import NoResultFound +from rhodecode.model.user import UserModel class TestAdminUsersController(TestController): def test_index(self): + self.log_user() response = self.app.get(url('users')) # Test response... @@ -21,30 +23,31 @@ class TestAdminUsersController(TestContr lastname = 'lastname' email = 'mail@mail.com' - response = self.app.post(url('users'), {'username':username, - 'password':password, - 'password_confirmation':password_confirmation, - 'name':name, - 'active':True, - 'lastname':lastname, - 'email':email}) + response = self.app.post(url('users'), + {'username':username, + 'password':password, + 'password_confirmation':password_confirmation, + 'name':name, + 'active':True, + 'lastname':lastname, + 'email':email}) - assert '''created user %s''' % (username) in response.session['flash'][0], 'No flash message about new user' + self.assertTrue('''created user %s''' % (username) in + response.session['flash'][0]) - new_user = self.Session.query(User).filter(User.username == username).one() - + new_user = self.Session.query(User).\ + filter(User.username == username).one() - assert new_user.username == username, 'wrong info about username' - assert check_password(password, new_user.password) == True , 'wrong info about password' - assert new_user.name == name, 'wrong info about name' - assert new_user.lastname == lastname, 'wrong info about lastname' - assert new_user.email == email, 'wrong info about email' - + self.assertEqual(new_user.username,username) + self.assertEqual(check_password(password, new_user.password),True) + self.assertEqual(new_user.name,name) + self.assertEqual(new_user.lastname,lastname) + self.assertEqual(new_user.email,email) response.follow() response = response.follow() - assert """edit">newtestuser""" in response.body + self.assertTrue("""edit">newtestuser""" in response.body) def test_create_err(self): self.log_user() @@ -61,9 +64,9 @@ class TestAdminUsersController(TestContr 'lastname':lastname, 'email':email}) - assert """Invalid username""" in response.body - assert """Please enter a value""" in response.body - assert """An email address must contain a single @""" in response.body + self.assertTrue("""Invalid username""" in response.body) + self.assertTrue("""Please enter a value""" in response.body) + self.assertTrue("""An email address must contain a single @""" in response.body) def get_user(): self.Session.query(User).filter(User.username == username).one() @@ -71,6 +74,7 @@ class TestAdminUsersController(TestContr self.assertRaises(NoResultFound, get_user), 'found user in database' def test_new(self): + self.log_user() response = self.app.get(url('new_user')) def test_new_as_xml(self): @@ -100,14 +104,17 @@ class TestAdminUsersController(TestContr response = response.follow() - new_user = self.Session.query(User).filter(User.username == username).one() + new_user = self.Session.query(User)\ + .filter(User.username == username).one() response = self.app.delete(url('user', id=new_user.user_id)) - assert """successfully deleted user""" in response.session['flash'][0], 'No info about user deletion' + self.assertTrue("""successfully deleted user""" in + response.session['flash'][0]) def test_delete_browser_fakeout(self): - response = self.app.post(url('user', id=1), params=dict(_method='delete')) + response = self.app.post(url('user', id=1), + params=dict(_method='delete')) def test_show(self): response = self.app.get(url('user', id=1)) @@ -116,7 +123,57 @@ class TestAdminUsersController(TestContr response = self.app.get(url('formatted_user', id=1, format='xml')) def test_edit(self): - response = self.app.get(url('edit_user', id=1)) + self.log_user() + user = User.get_by_username(TEST_USER_ADMIN_LOGIN) + response = self.app.get(url('edit_user', id=user.user_id)) + + + def test_add_perm_create_repo(self): + self.log_user() + perm_none = Permission.get_by_key('hg.create.none') + perm_create = Permission.get_by_key('hg.create.repository') + + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + + + #User should have None permission on creation repository + self.assertEqual(UserModel().has_perm(user, perm_none), False) + self.assertEqual(UserModel().has_perm(user, perm_create), False) + + response = self.app.post(url('user_perm', id=user.user_id), + params=dict(_method='put', + create_repo_perm=True)) + + perm_none = Permission.get_by_key('hg.create.none') + perm_create = Permission.get_by_key('hg.create.repository') + + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + #User should have None permission on creation repository + self.assertEqual(UserModel().has_perm(user, perm_none), False) + self.assertEqual(UserModel().has_perm(user, perm_create), True) + + def test_revoke_perm_create_repo(self): + self.log_user() + perm_none = Permission.get_by_key('hg.create.none') + perm_create = Permission.get_by_key('hg.create.repository') + + user = User.get_by_username(TEST_USER_REGULAR2_LOGIN) + + + #User should have None permission on creation repository + self.assertEqual(UserModel().has_perm(user, perm_none), False) + self.assertEqual(UserModel().has_perm(user, perm_create), False) + + response = self.app.post(url('user_perm', id=user.user_id), + params=dict(_method='put')) + + perm_none = Permission.get_by_key('hg.create.none') + perm_create = Permission.get_by_key('hg.create.repository') + + user = User.get_by_username(TEST_USER_REGULAR2_LOGIN) + #User should have None permission on creation repository + self.assertEqual(UserModel().has_perm(user, perm_none), True) + self.assertEqual(UserModel().has_perm(user, perm_create), False) def test_edit_as_xml(self): response = self.app.get(url('formatted_edit_user', id=1, format='xml')) diff --git a/rhodecode/tests/test_models.py b/rhodecode/tests/test_models.py --- a/rhodecode/tests/test_models.py +++ b/rhodecode/tests/test_models.py @@ -5,7 +5,7 @@ from rhodecode.tests import * from rhodecode.model.repos_group import ReposGroupModel from rhodecode.model.repo import RepoModel from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \ - UsersGroup, UsersGroupMember + UsersGroup, UsersGroupMember, Permission from sqlalchemy.exc import IntegrityError from rhodecode.model.user import UserModel @@ -158,7 +158,10 @@ class TestReposGroups(unittest.TestCase) self.assertEqual(r.repo_name, os.path.join('g2', 'g1', r.just_name)) class TestUser(unittest.TestCase): - + def __init__(self, methodName='runTest'): + Session.remove() + super(TestUser, self).__init__(methodName=methodName) + def test_create_and_remove(self): usr = UserModel().create_or_update(username=u'test_user', password=u'qweqwe', email=u'u232@rhodecode.org', @@ -184,6 +187,7 @@ class TestUser(unittest.TestCase): class TestNotifications(unittest.TestCase): def __init__(self, methodName='runTest'): + Session.remove() self.u1 = UserModel().create_or_update(username=u'u1', password=u'qweqwe', email=u'u1@rhodecode.org', @@ -214,6 +218,8 @@ class TestNotifications(unittest.TestCas Session.commit() self.assertEqual(Notification.query().all(), []) + def tearDown(self): + self._clean_notifications() def test_create_notification(self): self.assertEqual([], Notification.query().all()) @@ -239,7 +245,6 @@ class TestNotifications(unittest.TestCas self.assertEqual(len(unotification), len(usrs)) self.assertEqual([x.user.user_id for x in unotification], usrs) - self._clean_notifications() def test_user_notifications(self): self.assertEqual([], Notification.query().all()) @@ -257,7 +262,6 @@ class TestNotifications(unittest.TestCas self.assertEqual(sorted([x.notification for x in u3.notifications]), sorted([notification2, notification1])) - self._clean_notifications() def test_delete_notifications(self): self.assertEqual([], Notification.query().all()) @@ -280,7 +284,6 @@ class TestNotifications(unittest.TestCas == notification).all() self.assertEqual(un, []) - self._clean_notifications() def test_delete_association(self): @@ -329,8 +332,6 @@ class TestNotifications(unittest.TestCas .scalar() self.assertNotEqual(u2notification, None) - self._clean_notifications() - def test_notification_counter(self): self._clean_notifications() self.assertEqual([], Notification.query().all()) @@ -359,4 +360,51 @@ class TestNotifications(unittest.TestCas .get_unread_cnt_for_user(self.u2), 1) self.assertEqual(NotificationModel() .get_unread_cnt_for_user(self.u3), 2) - self._clean_notifications() + +class TestUsers(unittest.TestCase): + + def __init__(self, methodName='runTest'): + super(TestUsers, self).__init__(methodName=methodName) + + def setUp(self): + self.u1 = UserModel().create_or_update(username=u'u1', + password=u'qweqwe', + email=u'u1@rhodecode.org', + name=u'u1', lastname=u'u1') + + def tearDown(self): + perm = Permission.query().all() + for p in perm: + UserModel().revoke_perm(self.u1, p) + + UserModel().delete(self.u1) + Session.commit() + + def test_add_perm(self): + perm = Permission.query().all()[0] + UserModel().grant_perm(self.u1, perm) + Session.commit() + self.assertEqual(UserModel().has_perm(self.u1, perm), True) + + def test_has_perm(self): + perm = Permission.query().all() + for p in perm: + has_p = UserModel().has_perm(self.u1, p) + self.assertEqual(False, has_p) + + def test_revoke_perm(self): + perm = Permission.query().all()[0] + UserModel().grant_perm(self.u1, perm) + Session.commit() + self.assertEqual(UserModel().has_perm(self.u1, perm), True) + + #revoke + UserModel().revoke_perm(self.u1, perm) + Session.commit() + self.assertEqual(UserModel().has_perm(self.u1, perm),False) + + + + + +