|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
permissions model for RhodeCode
|
|
|
"""
|
|
|
import collections
|
|
|
import logging
|
|
|
import traceback
|
|
|
|
|
|
from sqlalchemy.exc import DatabaseError
|
|
|
|
|
|
from rhodecode import events
|
|
|
from rhodecode.model import BaseModel
|
|
|
from rhodecode.model.db import (
|
|
|
User, Permission, UserToPerm, UserRepoToPerm, UserRepoGroupToPerm,
|
|
|
UserUserGroupToPerm, UserGroup, UserGroupToPerm, UserToRepoBranchPermission)
|
|
|
from rhodecode.lib.utils2 import str2bool, safe_int
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class PermissionModel(BaseModel):
|
|
|
"""
|
|
|
Permissions model for RhodeCode
|
|
|
"""
|
|
|
FORKING_DISABLED = 'hg.fork.none'
|
|
|
FORKING_ENABLED = 'hg.fork.repository'
|
|
|
|
|
|
cls = Permission
|
|
|
global_perms = {
|
|
|
'default_repo_create': None,
|
|
|
# special case for create repos on write access to group
|
|
|
'default_repo_create_on_write': None,
|
|
|
'default_repo_group_create': None,
|
|
|
'default_user_group_create': None,
|
|
|
'default_fork_create': None,
|
|
|
'default_inherit_default_permissions': None,
|
|
|
'default_register': None,
|
|
|
'default_password_reset': None,
|
|
|
'default_extern_activate': None,
|
|
|
|
|
|
# object permissions below
|
|
|
'default_repo_perm': None,
|
|
|
'default_group_perm': None,
|
|
|
'default_user_group_perm': None,
|
|
|
|
|
|
# branch
|
|
|
'default_branch_perm': None,
|
|
|
}
|
|
|
|
|
|
def set_global_permission_choices(self, c_obj, gettext_translator):
|
|
|
_ = gettext_translator
|
|
|
|
|
|
c_obj.repo_perms_choices = [
|
|
|
('repository.none', _('None'),),
|
|
|
('repository.read', _('Read'),),
|
|
|
('repository.write', _('Write'),),
|
|
|
('repository.admin', _('Admin'),)]
|
|
|
|
|
|
c_obj.group_perms_choices = [
|
|
|
('group.none', _('None'),),
|
|
|
('group.read', _('Read'),),
|
|
|
('group.write', _('Write'),),
|
|
|
('group.admin', _('Admin'),)]
|
|
|
|
|
|
c_obj.user_group_perms_choices = [
|
|
|
('usergroup.none', _('None'),),
|
|
|
('usergroup.read', _('Read'),),
|
|
|
('usergroup.write', _('Write'),),
|
|
|
('usergroup.admin', _('Admin'),)]
|
|
|
|
|
|
c_obj.branch_perms_choices = [
|
|
|
('branch.none', _('Protected/No Access'),),
|
|
|
('branch.merge', _('Web merge'),),
|
|
|
('branch.push', _('Push'),),
|
|
|
('branch.push_force', _('Force Push'),)]
|
|
|
|
|
|
c_obj.register_choices = [
|
|
|
('hg.register.none', _('Disabled')),
|
|
|
('hg.register.manual_activate', _('Allowed with manual account activation')),
|
|
|
('hg.register.auto_activate', _('Allowed with automatic account activation'))]
|
|
|
|
|
|
c_obj.password_reset_choices = [
|
|
|
('hg.password_reset.enabled', _('Allow password recovery')),
|
|
|
('hg.password_reset.hidden', _('Hide password recovery link')),
|
|
|
('hg.password_reset.disabled', _('Disable password recovery'))]
|
|
|
|
|
|
c_obj.extern_activate_choices = [
|
|
|
('hg.extern_activate.manual', _('Manual activation of external account')),
|
|
|
('hg.extern_activate.auto', _('Automatic activation of external account'))]
|
|
|
|
|
|
c_obj.repo_create_choices = [
|
|
|
('hg.create.none', _('Disabled')),
|
|
|
('hg.create.repository', _('Enabled'))]
|
|
|
|
|
|
c_obj.repo_create_on_write_choices = [
|
|
|
('hg.create.write_on_repogroup.false', _('Disabled')),
|
|
|
('hg.create.write_on_repogroup.true', _('Enabled'))]
|
|
|
|
|
|
c_obj.user_group_create_choices = [
|
|
|
('hg.usergroup.create.false', _('Disabled')),
|
|
|
('hg.usergroup.create.true', _('Enabled'))]
|
|
|
|
|
|
c_obj.repo_group_create_choices = [
|
|
|
('hg.repogroup.create.false', _('Disabled')),
|
|
|
('hg.repogroup.create.true', _('Enabled'))]
|
|
|
|
|
|
c_obj.fork_choices = [
|
|
|
(self.FORKING_DISABLED, _('Disabled')),
|
|
|
(self.FORKING_ENABLED, _('Enabled'))]
|
|
|
|
|
|
c_obj.inherit_default_permission_choices = [
|
|
|
('hg.inherit_default_perms.false', _('Disabled')),
|
|
|
('hg.inherit_default_perms.true', _('Enabled'))]
|
|
|
|
|
|
def get_default_perms(self, object_perms, suffix):
|
|
|
defaults = {}
|
|
|
for perm in object_perms:
|
|
|
# perms
|
|
|
if perm.permission.permission_name.startswith('repository.'):
|
|
|
defaults['default_repo_perm' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('group.'):
|
|
|
defaults['default_group_perm' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('usergroup.'):
|
|
|
defaults['default_user_group_perm' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
# branch
|
|
|
if perm.permission.permission_name.startswith('branch.'):
|
|
|
defaults['default_branch_perm' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
# creation of objects
|
|
|
if perm.permission.permission_name.startswith('hg.create.write_on_repogroup'):
|
|
|
defaults['default_repo_create_on_write' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
elif perm.permission.permission_name.startswith('hg.create.'):
|
|
|
defaults['default_repo_create' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('hg.fork.'):
|
|
|
defaults['default_fork_create' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('hg.inherit_default_perms.'):
|
|
|
defaults['default_inherit_default_permissions' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('hg.repogroup.'):
|
|
|
defaults['default_repo_group_create' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('hg.usergroup.'):
|
|
|
defaults['default_user_group_create' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
# registration and external account activation
|
|
|
if perm.permission.permission_name.startswith('hg.register.'):
|
|
|
defaults['default_register' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('hg.password_reset.'):
|
|
|
defaults['default_password_reset' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
if perm.permission.permission_name.startswith('hg.extern_activate.'):
|
|
|
defaults['default_extern_activate' + suffix] = perm.permission.permission_name
|
|
|
|
|
|
return defaults
|
|
|
|
|
|
def _make_new_user_perm(self, user, perm_name):
|
|
|
log.debug('Creating new user permission:%s', perm_name)
|
|
|
new_perm = Permission.get_by_key(perm_name)
|
|
|
if not new_perm:
|
|
|
raise ValueError(f'permission with name {perm_name} not found')
|
|
|
|
|
|
new = UserToPerm()
|
|
|
new.user = user
|
|
|
new.permission = new_perm
|
|
|
return new
|
|
|
|
|
|
def _make_new_user_group_perm(self, user_group, perm_name):
|
|
|
log.debug('Creating new user group permission:%s', perm_name)
|
|
|
new_perm = Permission.get_by_key(perm_name)
|
|
|
if not new_perm:
|
|
|
raise ValueError(f'permission with name {perm_name} not found')
|
|
|
|
|
|
new = UserGroupToPerm()
|
|
|
new.users_group = user_group
|
|
|
new.permission = new_perm
|
|
|
return new
|
|
|
|
|
|
def _keep_perm(self, perm_name, keep_fields):
|
|
|
def get_pat(field_name):
|
|
|
return {
|
|
|
# global perms
|
|
|
'default_repo_create': 'hg.create.',
|
|
|
# special case for create repos on write access to group
|
|
|
'default_repo_create_on_write': 'hg.create.write_on_repogroup.',
|
|
|
'default_repo_group_create': 'hg.repogroup.create.',
|
|
|
'default_user_group_create': 'hg.usergroup.create.',
|
|
|
'default_fork_create': 'hg.fork.',
|
|
|
'default_inherit_default_permissions': 'hg.inherit_default_perms.',
|
|
|
|
|
|
# application perms
|
|
|
'default_register': 'hg.register.',
|
|
|
'default_password_reset': 'hg.password_reset.',
|
|
|
'default_extern_activate': 'hg.extern_activate.',
|
|
|
|
|
|
# object permissions below
|
|
|
'default_repo_perm': 'repository.',
|
|
|
'default_group_perm': 'group.',
|
|
|
'default_user_group_perm': 'usergroup.',
|
|
|
# branch
|
|
|
'default_branch_perm': 'branch.',
|
|
|
|
|
|
}[field_name]
|
|
|
for field in keep_fields:
|
|
|
pat = get_pat(field)
|
|
|
if perm_name.startswith(pat):
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def _clear_object_perm(self, object_perms, preserve=None):
|
|
|
preserve = preserve or []
|
|
|
_deleted = []
|
|
|
for perm in object_perms:
|
|
|
perm_name = perm.permission.permission_name
|
|
|
if not self._keep_perm(perm_name, keep_fields=preserve):
|
|
|
_deleted.append(perm_name)
|
|
|
self.sa.delete(perm)
|
|
|
return _deleted
|
|
|
|
|
|
def _clear_user_perms(self, user_id, preserve=None):
|
|
|
perms = self.sa.query(UserToPerm)\
|
|
|
.filter(UserToPerm.user_id == user_id)\
|
|
|
.all()
|
|
|
return self._clear_object_perm(perms, preserve=preserve)
|
|
|
|
|
|
def _clear_user_group_perms(self, user_group_id, preserve=None):
|
|
|
perms = self.sa.query(UserGroupToPerm)\
|
|
|
.filter(UserGroupToPerm.users_group_id == user_group_id)\
|
|
|
.all()
|
|
|
return self._clear_object_perm(perms, preserve=preserve)
|
|
|
|
|
|
def _set_new_object_perms(self, obj_type, to_object, form_result, preserve=None):
|
|
|
# clear current entries, to make this function idempotent
|
|
|
# it will fix even if we define more permissions or permissions
|
|
|
# are somehow missing
|
|
|
preserve = preserve or []
|
|
|
_global_perms = self.global_perms.copy()
|
|
|
if obj_type not in ['user', 'user_group']:
|
|
|
raise ValueError("obj_type must be on of 'user' or 'user_group'")
|
|
|
global_perms = len(_global_perms)
|
|
|
default_user_perms = len(Permission.DEFAULT_USER_PERMISSIONS)
|
|
|
if global_perms != default_user_perms:
|
|
|
raise Exception(
|
|
|
'Inconsistent permissions definition. Got {} vs {}'.format(
|
|
|
global_perms, default_user_perms))
|
|
|
|
|
|
if obj_type == 'user':
|
|
|
self._clear_user_perms(to_object.user_id, preserve)
|
|
|
if obj_type == 'user_group':
|
|
|
self._clear_user_group_perms(to_object.users_group_id, preserve)
|
|
|
|
|
|
# now kill the keys that we want to preserve from the form.
|
|
|
for key in preserve:
|
|
|
del _global_perms[key]
|
|
|
|
|
|
for k in _global_perms.copy():
|
|
|
_global_perms[k] = form_result[k]
|
|
|
|
|
|
# at that stage we validate all are passed inside form_result
|
|
|
for _perm_key, perm_value in _global_perms.items():
|
|
|
if perm_value is None:
|
|
|
raise ValueError('Missing permission for {}'.format(_perm_key))
|
|
|
|
|
|
if obj_type == 'user':
|
|
|
p = self._make_new_user_perm(to_object, perm_value)
|
|
|
self.sa.add(p)
|
|
|
if obj_type == 'user_group':
|
|
|
p = self._make_new_user_group_perm(to_object, perm_value)
|
|
|
self.sa.add(p)
|
|
|
|
|
|
def _set_new_user_perms(self, user, form_result, preserve=None):
|
|
|
return self._set_new_object_perms(
|
|
|
'user', user, form_result, preserve)
|
|
|
|
|
|
def _set_new_user_group_perms(self, user_group, form_result, preserve=None):
|
|
|
return self._set_new_object_perms(
|
|
|
'user_group', user_group, form_result, preserve)
|
|
|
|
|
|
def set_new_user_perms(self, user, form_result):
|
|
|
# calculate what to preserve from what is given in form_result
|
|
|
preserve = set(self.global_perms.keys()).difference(set(form_result.keys()))
|
|
|
return self._set_new_user_perms(user, form_result, preserve)
|
|
|
|
|
|
def set_new_user_group_perms(self, user_group, form_result):
|
|
|
# calculate what to preserve from what is given in form_result
|
|
|
preserve = set(self.global_perms.keys()).difference(set(form_result.keys()))
|
|
|
return self._set_new_user_group_perms(user_group, form_result, preserve)
|
|
|
|
|
|
def create_permissions(self):
|
|
|
"""
|
|
|
Create permissions for whole system
|
|
|
"""
|
|
|
for p in Permission.PERMS:
|
|
|
if not Permission.get_by_key(p[0]):
|
|
|
new_perm = Permission()
|
|
|
new_perm.permission_name = p[0]
|
|
|
new_perm.permission_longname = p[0] # translation err with p[1]
|
|
|
self.sa.add(new_perm)
|
|
|
|
|
|
def _create_default_object_permission(self, obj_type, obj, obj_perms,
|
|
|
force=False):
|
|
|
if obj_type not in ['user', 'user_group']:
|
|
|
raise ValueError("obj_type must be on of 'user' or 'user_group'")
|
|
|
|
|
|
def _get_group(perm_name):
|
|
|
return '.'.join(perm_name.split('.')[:1])
|
|
|
|
|
|
defined_perms_groups = list(map(
|
|
|
_get_group, (x.permission.permission_name for x in obj_perms)))
|
|
|
log.debug('GOT ALREADY DEFINED:%s', obj_perms)
|
|
|
|
|
|
if force:
|
|
|
self._clear_object_perm(obj_perms)
|
|
|
self.sa.commit()
|
|
|
defined_perms_groups = []
|
|
|
# for every default permission that needs to be created, we check if
|
|
|
# it's group is already defined, if it's not we create default perm
|
|
|
for perm_name in Permission.DEFAULT_USER_PERMISSIONS:
|
|
|
gr = _get_group(perm_name)
|
|
|
if gr not in defined_perms_groups:
|
|
|
log.debug('GR:%s not found, creating permission %s',
|
|
|
gr, perm_name)
|
|
|
if obj_type == 'user':
|
|
|
new_perm = self._make_new_user_perm(obj, perm_name)
|
|
|
self.sa.add(new_perm)
|
|
|
if obj_type == 'user_group':
|
|
|
new_perm = self._make_new_user_group_perm(obj, perm_name)
|
|
|
self.sa.add(new_perm)
|
|
|
|
|
|
def create_default_user_permissions(self, user, force=False):
|
|
|
"""
|
|
|
Creates only missing default permissions for user, if force is set it
|
|
|
resets the default permissions for that user
|
|
|
|
|
|
:param user:
|
|
|
:param force:
|
|
|
"""
|
|
|
user = self._get_user(user)
|
|
|
obj_perms = UserToPerm.query().filter(UserToPerm.user == user).all()
|
|
|
return self._create_default_object_permission(
|
|
|
'user', user, obj_perms, force)
|
|
|
|
|
|
def create_default_user_group_permissions(self, user_group, force=False):
|
|
|
"""
|
|
|
Creates only missing default permissions for user group, if force is
|
|
|
set it resets the default permissions for that user group
|
|
|
|
|
|
:param user_group:
|
|
|
:param force:
|
|
|
"""
|
|
|
user_group = self._get_user_group(user_group)
|
|
|
obj_perms = UserToPerm.query().filter(UserGroupToPerm.users_group == user_group).all()
|
|
|
return self._create_default_object_permission(
|
|
|
'user_group', user_group, obj_perms, force)
|
|
|
|
|
|
def update_application_permissions(self, form_result):
|
|
|
if 'perm_user_id' in form_result:
|
|
|
perm_user = User.get(safe_int(form_result['perm_user_id']))
|
|
|
else:
|
|
|
# used mostly to do lookup for default user
|
|
|
perm_user = User.get_by_username(form_result['perm_user_name'])
|
|
|
|
|
|
try:
|
|
|
# stage 1 set anonymous access
|
|
|
if perm_user.username == User.DEFAULT_USER:
|
|
|
perm_user.active = str2bool(form_result['anonymous'])
|
|
|
self.sa.add(perm_user)
|
|
|
|
|
|
# stage 2 reset defaults and set them from form data
|
|
|
self._set_new_user_perms(perm_user, form_result, preserve=[
|
|
|
'default_repo_perm',
|
|
|
'default_group_perm',
|
|
|
'default_user_group_perm',
|
|
|
'default_branch_perm',
|
|
|
|
|
|
'default_repo_group_create',
|
|
|
'default_user_group_create',
|
|
|
'default_repo_create_on_write',
|
|
|
'default_repo_create',
|
|
|
'default_fork_create',
|
|
|
'default_inherit_default_permissions'])
|
|
|
|
|
|
self.sa.commit()
|
|
|
except (DatabaseError,):
|
|
|
log.error(traceback.format_exc())
|
|
|
self.sa.rollback()
|
|
|
raise
|
|
|
|
|
|
def update_user_permissions(self, form_result):
|
|
|
if 'perm_user_id' in form_result:
|
|
|
perm_user = User.get(safe_int(form_result['perm_user_id']))
|
|
|
else:
|
|
|
# used mostly to do lookup for default user
|
|
|
perm_user = User.get_by_username(form_result['perm_user_name'])
|
|
|
try:
|
|
|
# stage 2 reset defaults and set them from form data
|
|
|
self._set_new_user_perms(perm_user, form_result, preserve=[
|
|
|
'default_repo_perm',
|
|
|
'default_group_perm',
|
|
|
'default_user_group_perm',
|
|
|
'default_branch_perm',
|
|
|
|
|
|
'default_register',
|
|
|
'default_password_reset',
|
|
|
'default_extern_activate'])
|
|
|
self.sa.commit()
|
|
|
except (DatabaseError,):
|
|
|
log.error(traceback.format_exc())
|
|
|
self.sa.rollback()
|
|
|
raise
|
|
|
|
|
|
def update_user_group_permissions(self, form_result):
|
|
|
if 'perm_user_group_id' in form_result:
|
|
|
perm_user_group = UserGroup.get(safe_int(form_result['perm_user_group_id']))
|
|
|
else:
|
|
|
# used mostly to do lookup for default user
|
|
|
perm_user_group = UserGroup.get_by_group_name(form_result['perm_user_group_name'])
|
|
|
try:
|
|
|
# stage 2 reset defaults and set them from form data
|
|
|
self._set_new_user_group_perms(perm_user_group, form_result, preserve=[
|
|
|
'default_repo_perm',
|
|
|
'default_group_perm',
|
|
|
'default_user_group_perm',
|
|
|
'default_branch_perm',
|
|
|
|
|
|
'default_register',
|
|
|
'default_password_reset',
|
|
|
'default_extern_activate'])
|
|
|
self.sa.commit()
|
|
|
except (DatabaseError,):
|
|
|
log.error(traceback.format_exc())
|
|
|
self.sa.rollback()
|
|
|
raise
|
|
|
|
|
|
def update_object_permissions(self, form_result):
|
|
|
if 'perm_user_id' in form_result:
|
|
|
perm_user = User.get(safe_int(form_result['perm_user_id']))
|
|
|
else:
|
|
|
# used mostly to do lookup for default user
|
|
|
perm_user = User.get_by_username(form_result['perm_user_name'])
|
|
|
try:
|
|
|
|
|
|
# stage 2 reset defaults and set them from form data
|
|
|
self._set_new_user_perms(perm_user, form_result, preserve=[
|
|
|
'default_repo_group_create',
|
|
|
'default_user_group_create',
|
|
|
'default_repo_create_on_write',
|
|
|
'default_repo_create',
|
|
|
'default_fork_create',
|
|
|
'default_inherit_default_permissions',
|
|
|
'default_branch_perm',
|
|
|
|
|
|
'default_register',
|
|
|
'default_password_reset',
|
|
|
'default_extern_activate'])
|
|
|
|
|
|
# overwrite default repo permissions
|
|
|
if form_result['overwrite_default_repo']:
|
|
|
_def_name = form_result['default_repo_perm'].split('repository.')[-1]
|
|
|
_def = Permission.get_by_key('repository.' + _def_name)
|
|
|
for r2p in self.sa.query(UserRepoToPerm)\
|
|
|
.filter(UserRepoToPerm.user == perm_user)\
|
|
|
.all():
|
|
|
# don't reset PRIVATE repositories
|
|
|
if not r2p.repository.private:
|
|
|
r2p.permission = _def
|
|
|
self.sa.add(r2p)
|
|
|
|
|
|
# overwrite default repo group permissions
|
|
|
if form_result['overwrite_default_group']:
|
|
|
_def_name = form_result['default_group_perm'].split('group.')[-1]
|
|
|
_def = Permission.get_by_key('group.' + _def_name)
|
|
|
for g2p in self.sa.query(UserRepoGroupToPerm)\
|
|
|
.filter(UserRepoGroupToPerm.user == perm_user)\
|
|
|
.all():
|
|
|
g2p.permission = _def
|
|
|
self.sa.add(g2p)
|
|
|
|
|
|
# overwrite default user group permissions
|
|
|
if form_result['overwrite_default_user_group']:
|
|
|
_def_name = form_result['default_user_group_perm'].split('usergroup.')[-1]
|
|
|
# user groups
|
|
|
_def = Permission.get_by_key('usergroup.' + _def_name)
|
|
|
for g2p in self.sa.query(UserUserGroupToPerm)\
|
|
|
.filter(UserUserGroupToPerm.user == perm_user)\
|
|
|
.all():
|
|
|
g2p.permission = _def
|
|
|
self.sa.add(g2p)
|
|
|
|
|
|
# COMMIT
|
|
|
self.sa.commit()
|
|
|
except (DatabaseError,):
|
|
|
log.exception('Failed to set default object permissions')
|
|
|
self.sa.rollback()
|
|
|
raise
|
|
|
|
|
|
# because we've FORCED and update here, make sure we reset all permissions cache
|
|
|
PermissionModel().trigger_permission_flush()
|
|
|
|
|
|
def update_branch_permissions(self, form_result):
|
|
|
if 'perm_user_id' in form_result:
|
|
|
perm_user = User.get(safe_int(form_result['perm_user_id']))
|
|
|
else:
|
|
|
# used mostly to do lookup for default user
|
|
|
perm_user = User.get_by_username(form_result['perm_user_name'])
|
|
|
try:
|
|
|
|
|
|
# stage 2 reset defaults and set them from form data
|
|
|
self._set_new_user_perms(perm_user, form_result, preserve=[
|
|
|
'default_repo_perm',
|
|
|
'default_group_perm',
|
|
|
'default_user_group_perm',
|
|
|
|
|
|
'default_repo_group_create',
|
|
|
'default_user_group_create',
|
|
|
'default_repo_create_on_write',
|
|
|
'default_repo_create',
|
|
|
'default_fork_create',
|
|
|
'default_inherit_default_permissions',
|
|
|
|
|
|
'default_register',
|
|
|
'default_password_reset',
|
|
|
'default_extern_activate'])
|
|
|
|
|
|
# overwrite default branch permissions
|
|
|
if form_result['overwrite_default_branch']:
|
|
|
_def_name = \
|
|
|
form_result['default_branch_perm'].split('branch.')[-1]
|
|
|
|
|
|
_def = Permission.get_by_key('branch.' + _def_name)
|
|
|
|
|
|
user_perms = UserToRepoBranchPermission.query()\
|
|
|
.join(UserToRepoBranchPermission.user_repo_to_perm)\
|
|
|
.filter(UserRepoToPerm.user == perm_user).all()
|
|
|
|
|
|
for g2p in user_perms:
|
|
|
g2p.permission = _def
|
|
|
self.sa.add(g2p)
|
|
|
|
|
|
# COMMIT
|
|
|
self.sa.commit()
|
|
|
except (DatabaseError,):
|
|
|
log.exception('Failed to set default branch permissions')
|
|
|
self.sa.rollback()
|
|
|
raise
|
|
|
|
|
|
def get_users_with_repo_write(self, db_repo):
|
|
|
write_plus = ['repository.write', 'repository.admin']
|
|
|
default_user_id = User.get_default_user_id()
|
|
|
user_write_permissions = collections.OrderedDict()
|
|
|
|
|
|
# write or higher and DEFAULT user for inheritance
|
|
|
for perm in db_repo.permissions():
|
|
|
if perm.permission in write_plus or perm.user_id == default_user_id:
|
|
|
user_write_permissions[perm.user_id] = perm
|
|
|
return user_write_permissions
|
|
|
|
|
|
def get_user_groups_with_repo_write(self, db_repo):
|
|
|
write_plus = ['repository.write', 'repository.admin']
|
|
|
user_group_write_permissions = collections.OrderedDict()
|
|
|
|
|
|
# write or higher and DEFAULT user for inheritance
|
|
|
for p in db_repo.permission_user_groups():
|
|
|
if p.permission in write_plus:
|
|
|
user_group_write_permissions[p.users_group_id] = p
|
|
|
return user_group_write_permissions
|
|
|
|
|
|
def trigger_permission_flush(self, affected_user_ids=None):
|
|
|
affected_user_ids = affected_user_ids or User.get_all_user_ids()
|
|
|
events.trigger(events.UserPermissionsChange(affected_user_ids))
|
|
|
|
|
|
def flush_user_permission_caches(self, changes, affected_user_ids=None):
|
|
|
affected_user_ids = affected_user_ids or []
|
|
|
|
|
|
for change in changes['added'] + changes['updated'] + changes['deleted']:
|
|
|
if change['type'] == 'user':
|
|
|
affected_user_ids.append(change['id'])
|
|
|
if change['type'] == 'user_group':
|
|
|
user_group = UserGroup.get(safe_int(change['id']))
|
|
|
if user_group:
|
|
|
group_members_ids = [x.user_id for x in user_group.members]
|
|
|
affected_user_ids.extend(group_members_ids)
|
|
|
|
|
|
self.trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
return affected_user_ids
|
|
|
|