diff --git a/rhodecode/lib/db_manage.py b/rhodecode/lib/db_manage.py --- a/rhodecode/lib/db_manage.py +++ b/rhodecode/lib/db_manage.py @@ -44,6 +44,7 @@ from rhodecode.model.repos_group import #from rhodecode.model import meta from rhodecode.model.meta import Session, Base from rhodecode.model.repo import RepoModel +from rhodecode.model.permission import PermissionModel log = logging.getLogger(__name__) @@ -550,7 +551,7 @@ class DbManage(object): u2p = UserToPerm.query()\ .filter(UserToPerm.user == default_user).all() fixed = False - if len(u2p) != len(User.DEFAULT_PERMISSIONS): + if len(u2p) != len(Permission.DEFAULT_USER_PERMISSIONS): for p in u2p: Session().delete(p) fixed = True @@ -682,6 +683,9 @@ class DbManage(object): firstname='Anonymous', lastname='User') def create_permissions(self): + """ + Creates all permissions defined in the system + """ # module.(access|create|change|delete)_[name] # module.(none|read|write|admin) @@ -693,27 +697,12 @@ class DbManage(object): self.sa.add(new_perm) def populate_default_permissions(self): + """ + Populate default permissions. It will create only the default + permissions that are missing, and not alter already defined ones + """ log.info('creating default user permissions') - - default_user = User.get_by_username('default') - - for def_perm in User.DEFAULT_PERMISSIONS: - - perm = self.sa.query(Permission)\ - .filter(Permission.permission_name == def_perm)\ - .scalar() - if not perm: - raise Exception( - 'CRITICAL: permission %s not found inside database !!' - % def_perm - ) - if not UserToPerm.query()\ - .filter(UserToPerm.permission == perm)\ - .filter(UserToPerm.user == default_user).scalar(): - reg_perm = UserToPerm() - reg_perm.user = default_user - reg_perm.permission = perm - self.sa.add(reg_perm) + PermissionModel(self.sa).create_default_permissions(user=User.DEFAULT_USER) @staticmethod def check_waitress(): diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -320,10 +320,7 @@ class User(Base, BaseModel): 'mysql_charset': 'utf8'} ) DEFAULT_USER = 'default' - DEFAULT_PERMISSIONS = [ - 'hg.register.manual_activate', 'hg.create.repository', - 'hg.fork.repository', 'repository.read', 'group.read' - ] + user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True) username = Column("username", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None) password = Column("password", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None) @@ -502,6 +499,13 @@ class User(Base, BaseModel): raise Exception('Missing administrative account!') return user + @classmethod + def get_default_user(cls, cache=False): + user = User.get_by_username(User.DEFAULT_USER, cache=cache) + if user is None: + raise Exception('Missing default account!') + return user + def get_api_data(self): """ Common function for generating user related data for API @@ -1405,6 +1409,8 @@ class Permission(Base, BaseModel): 'mysql_charset': 'utf8'}, ) PERMS = [ + ('hg.admin', _('RhodeCode Administrator')), + ('repository.none', _('Repository no access')), ('repository.read', _('Repository read access')), ('repository.write', _('Repository write access')), @@ -1420,11 +1426,12 @@ class Permission(Base, BaseModel): ('usergroup.write', _('User group write access')), ('usergroup.admin', _('User group admin access')), - ('hg.admin', _('RhodeCode Administrator')), ('hg.create.none', _('Repository creation disabled')), ('hg.create.repository', _('Repository creation enabled')), + ('hg.fork.none', _('Repository forking disabled')), ('hg.fork.repository', _('Repository forking enabled')), + ('hg.register.none', _('Register disabled')), ('hg.register.manual_activate', _('Register new user with RhodeCode ' 'with manual activation')), @@ -1433,6 +1440,16 @@ class Permission(Base, BaseModel): 'with auto activation')), ] + #definition of system default permissions for DEFAULT user + DEFAULT_USER_PERMISSIONS = [ + 'repository.read', + 'group.read', + 'usergroup.read', + 'hg.create.repository', + 'hg.fork.repository', + 'hg.register.manual_activate', + ] + # defines which permissions are more important higher the more important PERM_WEIGHTS = { 'repository.none': 0, diff --git a/rhodecode/model/permission.py b/rhodecode/model/permission.py --- a/rhodecode/model/permission.py +++ b/rhodecode/model/permission.py @@ -43,11 +43,49 @@ class PermissionModel(BaseModel): cls = Permission + def create_default_permissions(self, user): + """ + Creates only missing default permissions for user + + :param user: + """ + user = self._get_user(user) + + def _make_perm(perm): + new_perm = UserToPerm() + new_perm.user = user + new_perm.permission = Permission.get_by_key(perm) + return new_perm + + def _get_group(perm_name): + return '.'.join(perm_name.split('.')[:1]) + + perms = UserToPerm.query().filter(UserToPerm.user == user).all() + defined_perms_groups = map(_get_group, + (x.permission.permission_name for x in perms)) + log.debug('GOT ALREADY DEFINED:%s' % perms) + DEFAULT_PERMS = Permission.DEFAULT_USER_PERMISSIONS + + # for every default permission that needs to be created, we check if + # it's group is already defined, if it's not we create default perm + for perm_name in DEFAULT_PERMS: + gr = _get_group(perm_name) + if gr not in defined_perms_groups: + log.debug('GR:%s not found, creating permission %s' + % (gr, perm_name)) + new_perm = _make_perm(perm_name) + self.sa.add(new_perm) + def update(self, form_result): perm_user = User.get_by_username(username=form_result['perm_user_name']) - u2p = self.sa.query(UserToPerm).filter(UserToPerm.user == perm_user).all() try: + # stage 1 set anonymous access + if perm_user.username == 'default': + perm_user.active = str2bool(form_result['anonymous']) + self.sa.add(perm_user) + + # stage 2 reset defaults and set them from form data def _make_new(usr, perm_name): new = UserToPerm() new.user = usr @@ -56,6 +94,9 @@ class PermissionModel(BaseModel): # clear current entries, to make this function idempotent # it will fix even if we define more permissions or permissions # are somehow missing + u2p = self.sa.query(UserToPerm)\ + .filter(UserToPerm.user == perm_user)\ + .all() for p in u2p: self.sa.delete(p) #create fresh set of permissions @@ -65,7 +106,7 @@ class PermissionModel(BaseModel): p = _make_new(perm_user, form_result[def_perm_key]) self.sa.add(p) - #stage 2 update all default permissions for repos if checked + #stage 3 update all default permissions for repos if checked if form_result['overwrite_default_repo'] == True: _def_name = form_result['default_repo_perm'].split('repository.')[-1] _def = Permission.get_by_key('repository.' + _def_name) @@ -89,11 +130,6 @@ class PermissionModel(BaseModel): g2p.permission = _def self.sa.add(g2p) - # stage 3 set anonymous access - if perm_user.username == 'default': - perm_user.active = str2bool(form_result['anonymous']) - self.sa.add(perm_user) - self.sa.commit() except (DatabaseError,): log.error(traceback.format_exc()) diff --git a/rhodecode/tests/models/test_permissions.py b/rhodecode/tests/models/test_permissions.py --- a/rhodecode/tests/models/test_permissions.py +++ b/rhodecode/tests/models/test_permissions.py @@ -4,12 +4,14 @@ from rhodecode.tests import * from rhodecode.tests.fixture import Fixture from rhodecode.model.repos_group import ReposGroupModel from rhodecode.model.repo import RepoModel -from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm +from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm,\ + Permission, UserToPerm from rhodecode.model.user import UserModel from rhodecode.model.meta import Session from rhodecode.model.users_group import UserGroupModel from rhodecode.lib.auth import AuthUser +from rhodecode.model.permission import PermissionModel fixture = Fixture() @@ -101,13 +103,15 @@ class TestPermissions(unittest.TestCase) u1_auth = AuthUser(user_id=self.u1.user_id) perms = { 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, - 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), + 'global': set(Permission.DEFAULT_USER_PERMISSIONS), 'repositories': {u'vcs_test_hg': u'repository.read'} } self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], perms['repositories'][HG_REPO]) self.assertEqual(u1_auth.permissions['repositories_groups'], perms['repositories_groups']) + self.assertEqual(u1_auth.permissions['global'], + perms['global']) def test_default_admin_group_perms(self): self.g1 = fixture.create_group('test1', skip_if_exists=True) @@ -347,7 +351,8 @@ class TestPermissions(unittest.TestCase) self.assertEqual(u1_auth.permissions['global'], set(['hg.create.repository', 'hg.fork.repository', 'hg.register.manual_activate', - 'repository.read', 'group.read'])) + 'repository.read', 'group.read', + 'usergroup.read'])) def test_inherited_permissions_from_default_on_user_disabled(self): user_model = UserModel() @@ -365,7 +370,8 @@ class TestPermissions(unittest.TestCase) self.assertEqual(u1_auth.permissions['global'], set(['hg.create.none', 'hg.fork.none', 'hg.register.manual_activate', - 'repository.read', 'group.read'])) + 'repository.read', 'group.read', + 'usergroup.read'])) def test_non_inherited_permissions_from_default_on_user_enabled(self): user_model = UserModel() @@ -391,7 +397,8 @@ class TestPermissions(unittest.TestCase) self.assertEqual(u1_auth.permissions['global'], set(['hg.create.none', 'hg.fork.none', 'hg.register.manual_activate', - 'repository.read', 'group.read'])) + 'repository.read', 'group.read', + 'usergroup.read'])) def test_non_inherited_permissions_from_default_on_user_disabled(self): user_model = UserModel() @@ -417,7 +424,8 @@ class TestPermissions(unittest.TestCase) self.assertEqual(u1_auth.permissions['global'], set(['hg.create.repository', 'hg.fork.repository', 'hg.register.manual_activate', - 'repository.read', 'group.read'])) + 'repository.read', 'group.read', + 'usergroup.read'])) def test_owner_permissions_doesnot_get_overwritten_by_group(self): #create repo as USER, @@ -458,3 +466,60 @@ class TestPermissions(unittest.TestCase) u1_auth = AuthUser(user_id=self.u1.user_id) self.assertEqual(u1_auth.permissions['repositories']['myownrepo'], 'repository.admin') + + def _test_def_perm_equal(self, user, change_factor=0): + perms = UserToPerm.query()\ + .filter(UserToPerm.user == user)\ + .all() + self.assertEqual(len(perms), + len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor, + msg=perms) + + def test_set_default_permissions(self): + PermissionModel().create_default_permissions(user=self.u1) + self._test_def_perm_equal(user=self.u1) + + def test_set_default_permissions_after_one_is_missing(self): + PermissionModel().create_default_permissions(user=self.u1) + self._test_def_perm_equal(user=self.u1) + #now we delete one, it should be re-created after another call + perms = UserToPerm.query()\ + .filter(UserToPerm.user == self.u1)\ + .all() + Session().delete(perms[0]) + Session().commit() + + self._test_def_perm_equal(user=self.u1, change_factor=-1) + + #create missing one ! + PermissionModel().create_default_permissions(user=self.u1) + self._test_def_perm_equal(user=self.u1) + + @parameterized.expand([ + ('repository.read', 'repository.none'), + ('group.read', 'group.none'), + ('usergroup.read', 'usergroup.none'), + ('hg.create.repository', 'hg.create.none'), + ('hg.fork.repository', 'hg.fork.none'), + ('hg.register.manual_activate', 'hg.register.auto_activate',) + ]) + def test_set_default_permissions_after_modification(self, perm, modify_to): + PermissionModel().create_default_permissions(user=self.u1) + self._test_def_perm_equal(user=self.u1) + + old = Permission.get_by_key(perm) + new = Permission.get_by_key(modify_to) + self.assertNotEqual(old, None) + self.assertNotEqual(new, None) + + #now modify permissions + p = UserToPerm.query()\ + .filter(UserToPerm.user == self.u1)\ + .filter(UserToPerm.permission == old)\ + .one() + p.permission = new + Session().add(p) + Session().commit() + + PermissionModel().create_default_permissions(user=self.u1) + self._test_def_perm_equal(user=self.u1)