# HG changeset patch # User RhodeCode Admin # Date 2024-10-11 12:41:02 # Node ID cb083474b1746e6185e2494a842dc0e5820d8e0c # Parent 4be441f6c6f157074ed53bec114989bd139640b3 fix(permissions): fixed security problem with apply-to-children functionality breaking permissions for private repositories fixes: RCCE-141 diff --git a/rhodecode/model/repo_group.py b/rhodecode/model/repo_group.py --- a/rhodecode/model/repo_group.py +++ b/rhodecode/model/repo_group.py @@ -1,4 +1,4 @@ -# Copyright (C) 2011-2023 RhodeCode GmbH +# Copyright (C) 2011-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 @@ -30,7 +30,6 @@ import time import traceback import string -from zope.cachedescriptors.property import Lazy as LazyProperty from rhodecode import events from rhodecode.model import BaseModel @@ -38,7 +37,7 @@ from rhodecode.model.db import (_hash_ke Session, RepoGroup, UserRepoGroupToPerm, User, Permission, UserGroupRepoGroupToPerm, UserGroup, Repository) from rhodecode.model.permission import PermissionModel -from rhodecode.model.settings import VcsSettingsModel, SettingsModel +from rhodecode.model.settings import SettingsModel from rhodecode.lib.caching_query import FromCache from rhodecode.lib.utils2 import action_logger_generic @@ -350,46 +349,45 @@ class RepoGroupModel(BaseModel): 'default_user_changed': None } - def _set_perm_user(obj, user, perm): - if isinstance(obj, RepoGroup): - self.grant_user_permission( - repo_group=obj, user=user, perm=perm) - elif isinstance(obj, Repository): + def _set_perm_user(_obj: RepoGroup | Repository, _user_obj: User, _perm): + + if isinstance(_obj, RepoGroup): + self.grant_user_permission(repo_group=_obj, user=_user_obj, perm=_perm) + elif isinstance(_obj, Repository): # private repos will not allow to change the default # permissions using recursive mode - if obj.private and user == User.DEFAULT_USER: + if _obj.private and _user_obj.username == User.DEFAULT_USER: + log.debug('Skipping private repo %s for user %s', _obj, _user_obj) return - # we set group permission but we have to switch to repo - # permission - perm = perm.replace('group.', 'repository.') - RepoModel().grant_user_permission( - repo=obj, user=user, perm=perm) + # we set group permission, we have to switch to repo permission definition + new_perm = _perm.replace('group.', 'repository.') + RepoModel().grant_user_permission(repo=_obj, user=_user_obj, perm=new_perm) + + def _set_perm_group(_obj: RepoGroup | Repository, users_group: UserGroup, _perm): + if isinstance(_obj, RepoGroup): + self.grant_user_group_permission(repo_group=_obj, group_name=users_group, perm=_perm) + elif isinstance(_obj, Repository): + # we set group permission, we have to switch to repo permission definition + new_perm = _perm.replace('group.', 'repository.') + RepoModel().grant_user_group_permission(repo=_obj, group_name=users_group, perm=new_perm) - def _set_perm_group(obj, users_group, perm): - if isinstance(obj, RepoGroup): - self.grant_user_group_permission( - repo_group=obj, group_name=users_group, perm=perm) - elif isinstance(obj, Repository): - # we set group permission but we have to switch to repo - # permission - perm = perm.replace('group.', 'repository.') - RepoModel().grant_user_group_permission( - repo=obj, group_name=users_group, perm=perm) + def _revoke_perm_user(_obj: RepoGroup | Repository, _user_obj: User): + if isinstance(_obj, RepoGroup): + self.revoke_user_permission(repo_group=_obj, user=_user_obj) + elif isinstance(_obj, Repository): + # private repos will not allow to change the default + # permissions using recursive mode, also there's no revocation fo default user, just update + if _user_obj.username == User.DEFAULT_USER: + log.debug('Skipping private repo %s for user %s', _obj, _user_obj) + return + RepoModel().revoke_user_permission(repo=_obj, user=_user_obj) - def _revoke_perm_user(obj, user): - if isinstance(obj, RepoGroup): - self.revoke_user_permission(repo_group=obj, user=user) - elif isinstance(obj, Repository): - RepoModel().revoke_user_permission(repo=obj, user=user) - - def _revoke_perm_group(obj, user_group): - if isinstance(obj, RepoGroup): - self.revoke_user_group_permission( - repo_group=obj, group_name=user_group) - elif isinstance(obj, Repository): - RepoModel().revoke_user_group_permission( - repo=obj, group_name=user_group) + def _revoke_perm_group(_obj: RepoGroup | Repository, user_group: UserGroup): + if isinstance(_obj, RepoGroup): + self.revoke_user_group_permission(repo_group=_obj, group_name=user_group) + elif isinstance(_obj, Repository): + RepoModel().revoke_user_group_permission(repo=_obj, group_name=user_group) # start updates log.debug('Now updating permissions for %s in recursive mode:%s', @@ -423,7 +421,8 @@ class RepoGroupModel(BaseModel): for member_id, perm, member_type in perm_updates: member_id = int(member_id) if member_type == 'user': - member_name = User.get(member_id).username + member_obj = User.get(member_id) + member_name = member_obj.username if isinstance(obj, RepoGroup) and obj == repo_group and member_name == User.DEFAULT_USER: # NOTE(dan): detect if we changed permissions for default user perm_obj = self.sa.query(UserRepoGroupToPerm) \ @@ -434,15 +433,16 @@ class RepoGroupModel(BaseModel): changes['default_user_changed'] = True # this updates also current one if found - _set_perm_user(obj, user=member_id, perm=perm) + _set_perm_user(obj, member_obj, perm) elif member_type == 'user_group': - member_name = UserGroup.get(member_id).users_group_name - if not check_perms or has_group_perm(member_name, - user=cur_user): - _set_perm_group(obj, users_group=member_id, perm=perm) + member_obj = UserGroup.get(member_id) + member_name = member_obj.users_group_name + if not check_perms or has_group_perm(member_name, user=cur_user): + _set_perm_group(obj, member_obj, perm) else: - raise ValueError("member_type must be 'user' or 'user_group' " - "got {} instead".format(member_type)) + raise ValueError( + f"member_type must be 'user' or 'user_group' got {member_type} instead" + ) changes['updated'].append( {'change_obj': change_obj, 'type': member_type, @@ -452,17 +452,19 @@ class RepoGroupModel(BaseModel): for member_id, perm, member_type in perm_additions: member_id = int(member_id) if member_type == 'user': - member_name = User.get(member_id).username - _set_perm_user(obj, user=member_id, perm=perm) + member_obj = User.get(member_id) + member_name = member_obj.username + _set_perm_user(obj, member_obj, perm) elif member_type == 'user_group': # check if we have permissions to alter this usergroup - member_name = UserGroup.get(member_id).users_group_name - if not check_perms or has_group_perm(member_name, - user=cur_user): - _set_perm_group(obj, users_group=member_id, perm=perm) + member_obj = UserGroup.get(member_id) + member_name = member_obj.users_group_name + if not check_perms or has_group_perm(member_name, user=cur_user): + _set_perm_group(obj, member_obj, perm) else: - raise ValueError("member_type must be 'user' or 'user_group' " - "got {} instead".format(member_type)) + raise ValueError( + f"member_type must be 'user' or 'user_group' got {member_type} instead" + ) changes['added'].append( {'change_obj': change_obj, 'type': member_type, @@ -472,18 +474,19 @@ class RepoGroupModel(BaseModel): for member_id, perm, member_type in perm_deletions: member_id = int(member_id) if member_type == 'user': - member_name = User.get(member_id).username - _revoke_perm_user(obj, user=member_id) + member_obj = User.get(member_id) + member_name = member_obj.username + _revoke_perm_user(obj, member_obj) elif member_type == 'user_group': # check if we have permissions to alter this usergroup - member_name = UserGroup.get(member_id).users_group_name - if not check_perms or has_group_perm(member_name, - user=cur_user): - _revoke_perm_group(obj, user_group=member_id) + member_obj = UserGroup.get(member_id) + member_name = member_obj.users_group_name + if not check_perms or has_group_perm(member_name, user=cur_user): + _revoke_perm_group(obj, member_obj) else: - raise ValueError("member_type must be 'user' or 'user_group' " - "got {} instead".format(member_type)) - + raise ValueError( + f"member_type must be 'user' or 'user_group' got {member_type} instead" + ) changes['deleted'].append( {'change_obj': change_obj, 'type': member_type, 'id': member_id, 'name': member_name, 'new_perm': perm})