# Copyright (C) 2010-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ permissions model for RhodeCode """ import collections import logging import traceback from sqlalchemy.exc import DatabaseError from rhodecode import events from rhodecode.model import BaseModel from rhodecode.model.db import ( User, Permission, UserToPerm, UserRepoToPerm, UserRepoGroupToPerm, UserUserGroupToPerm, UserGroup, UserGroupToPerm, UserToRepoBranchPermission) from rhodecode.lib.utils2 import str2bool, safe_int log = logging.getLogger(__name__) class PermissionModel(BaseModel): """ Permissions model for RhodeCode """ FORKING_DISABLED = 'hg.fork.none' FORKING_ENABLED = 'hg.fork.repository' cls = Permission global_perms = { 'default_repo_create': None, # special case for create repos on write access to group 'default_repo_create_on_write': None, 'default_repo_group_create': None, 'default_user_group_create': None, 'default_fork_create': None, 'default_inherit_default_permissions': None, 'default_register': None, 'default_password_reset': None, 'default_extern_activate': None, # object permissions below 'default_repo_perm': None, 'default_group_perm': None, 'default_user_group_perm': None, # branch 'default_branch_perm': None, } def set_global_permission_choices(self, c_obj, gettext_translator): _ = gettext_translator c_obj.repo_perms_choices = [ ('repository.none', _('None'),), ('repository.read', _('Read'),), ('repository.write', _('Write'),), ('repository.admin', _('Admin'),)] c_obj.group_perms_choices = [ ('group.none', _('None'),), ('group.read', _('Read'),), ('group.write', _('Write'),), ('group.admin', _('Admin'),)] c_obj.user_group_perms_choices = [ ('usergroup.none', _('None'),), ('usergroup.read', _('Read'),), ('usergroup.write', _('Write'),), ('usergroup.admin', _('Admin'),)] c_obj.branch_perms_choices = [ ('branch.none', _('Protected/No Access'),), ('branch.merge', _('Web merge'),), ('branch.push', _('Push'),), ('branch.push_force', _('Force Push'),)] c_obj.register_choices = [ ('hg.register.none', _('Disabled')), ('hg.register.manual_activate', _('Allowed with manual account activation')), ('hg.register.auto_activate', _('Allowed with automatic account activation'))] c_obj.password_reset_choices = [ ('hg.password_reset.enabled', _('Allow password recovery')), ('hg.password_reset.hidden', _('Hide password recovery link')), ('hg.password_reset.disabled', _('Disable password recovery'))] c_obj.extern_activate_choices = [ ('hg.extern_activate.manual', _('Manual activation of external account')), ('hg.extern_activate.auto', _('Automatic activation of external account'))] c_obj.repo_create_choices = [ ('hg.create.none', _('Disabled')), ('hg.create.repository', _('Enabled'))] c_obj.repo_create_on_write_choices = [ ('hg.create.write_on_repogroup.false', _('Disabled')), ('hg.create.write_on_repogroup.true', _('Enabled'))] c_obj.user_group_create_choices = [ ('hg.usergroup.create.false', _('Disabled')), ('hg.usergroup.create.true', _('Enabled'))] c_obj.repo_group_create_choices = [ ('hg.repogroup.create.false', _('Disabled')), ('hg.repogroup.create.true', _('Enabled'))] c_obj.fork_choices = [ (self.FORKING_DISABLED, _('Disabled')), (self.FORKING_ENABLED, _('Enabled'))] c_obj.inherit_default_permission_choices = [ ('hg.inherit_default_perms.false', _('Disabled')), ('hg.inherit_default_perms.true', _('Enabled'))] def get_default_perms(self, object_perms, suffix): defaults = {} for perm in object_perms: # perms if perm.permission.permission_name.startswith('repository.'): defaults['default_repo_perm' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('group.'): defaults['default_group_perm' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('usergroup.'): defaults['default_user_group_perm' + suffix] = perm.permission.permission_name # branch if perm.permission.permission_name.startswith('branch.'): defaults['default_branch_perm' + suffix] = perm.permission.permission_name # creation of objects if perm.permission.permission_name.startswith('hg.create.write_on_repogroup'): defaults['default_repo_create_on_write' + suffix] = perm.permission.permission_name elif perm.permission.permission_name.startswith('hg.create.'): defaults['default_repo_create' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('hg.fork.'): defaults['default_fork_create' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('hg.inherit_default_perms.'): defaults['default_inherit_default_permissions' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('hg.repogroup.'): defaults['default_repo_group_create' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('hg.usergroup.'): defaults['default_user_group_create' + suffix] = perm.permission.permission_name # registration and external account activation if perm.permission.permission_name.startswith('hg.register.'): defaults['default_register' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('hg.password_reset.'): defaults['default_password_reset' + suffix] = perm.permission.permission_name if perm.permission.permission_name.startswith('hg.extern_activate.'): defaults['default_extern_activate' + suffix] = perm.permission.permission_name return defaults def _make_new_user_perm(self, user, perm_name): log.debug('Creating new user permission:%s', perm_name) new_perm = Permission.get_by_key(perm_name) if not new_perm: raise ValueError(f'permission with name {perm_name} not found') new = UserToPerm() new.user = user new.permission = new_perm return new def _make_new_user_group_perm(self, user_group, perm_name): log.debug('Creating new user group permission:%s', perm_name) new_perm = Permission.get_by_key(perm_name) if not new_perm: raise ValueError(f'permission with name {perm_name} not found') new = UserGroupToPerm() new.users_group = user_group new.permission = new_perm return new def _keep_perm(self, perm_name, keep_fields): def get_pat(field_name): return { # global perms 'default_repo_create': 'hg.create.', # special case for create repos on write access to group 'default_repo_create_on_write': 'hg.create.write_on_repogroup.', 'default_repo_group_create': 'hg.repogroup.create.', 'default_user_group_create': 'hg.usergroup.create.', 'default_fork_create': 'hg.fork.', 'default_inherit_default_permissions': 'hg.inherit_default_perms.', # application perms 'default_register': 'hg.register.', 'default_password_reset': 'hg.password_reset.', 'default_extern_activate': 'hg.extern_activate.', # object permissions below 'default_repo_perm': 'repository.', 'default_group_perm': 'group.', 'default_user_group_perm': 'usergroup.', # branch 'default_branch_perm': 'branch.', }[field_name] for field in keep_fields: pat = get_pat(field) if perm_name.startswith(pat): return True return False def _clear_object_perm(self, object_perms, preserve=None): preserve = preserve or [] _deleted = [] for perm in object_perms: perm_name = perm.permission.permission_name if not self._keep_perm(perm_name, keep_fields=preserve): _deleted.append(perm_name) self.sa.delete(perm) return _deleted def _clear_user_perms(self, user_id, preserve=None): perms = self.sa.query(UserToPerm)\ .filter(UserToPerm.user_id == user_id)\ .all() return self._clear_object_perm(perms, preserve=preserve) def _clear_user_group_perms(self, user_group_id, preserve=None): perms = self.sa.query(UserGroupToPerm)\ .filter(UserGroupToPerm.users_group_id == user_group_id)\ .all() return self._clear_object_perm(perms, preserve=preserve) def _set_new_object_perms(self, obj_type, to_object, form_result, preserve=None): # clear current entries, to make this function idempotent # it will fix even if we define more permissions or permissions # are somehow missing preserve = preserve or [] _global_perms = self.global_perms.copy() if obj_type not in ['user', 'user_group']: raise ValueError("obj_type must be on of 'user' or 'user_group'") global_perms = len(_global_perms) default_user_perms = len(Permission.DEFAULT_USER_PERMISSIONS) if global_perms != default_user_perms: raise Exception( 'Inconsistent permissions definition. Got {} vs {}'.format( global_perms, default_user_perms)) if obj_type == 'user': self._clear_user_perms(to_object.user_id, preserve) if obj_type == 'user_group': self._clear_user_group_perms(to_object.users_group_id, preserve) # now kill the keys that we want to preserve from the form. for key in preserve: del _global_perms[key] for k in _global_perms.copy(): _global_perms[k] = form_result[k] # at that stage we validate all are passed inside form_result for _perm_key, perm_value in _global_perms.items(): if perm_value is None: raise ValueError('Missing permission for {}'.format(_perm_key)) if obj_type == 'user': p = self._make_new_user_perm(to_object, perm_value) self.sa.add(p) if obj_type == 'user_group': p = self._make_new_user_group_perm(to_object, perm_value) self.sa.add(p) def _set_new_user_perms(self, user, form_result, preserve=None): return self._set_new_object_perms( 'user', user, form_result, preserve) def _set_new_user_group_perms(self, user_group, form_result, preserve=None): return self._set_new_object_perms( 'user_group', user_group, form_result, preserve) def set_new_user_perms(self, user, form_result): # calculate what to preserve from what is given in form_result preserve = set(self.global_perms.keys()).difference(set(form_result.keys())) return self._set_new_user_perms(user, form_result, preserve) def set_new_user_group_perms(self, user_group, form_result): # calculate what to preserve from what is given in form_result preserve = set(self.global_perms.keys()).difference(set(form_result.keys())) return self._set_new_user_group_perms(user_group, form_result, preserve) def create_permissions(self): """ Create permissions for whole system """ for p in Permission.PERMS: if not Permission.get_by_key(p[0]): new_perm = Permission() new_perm.permission_name = p[0] new_perm.permission_longname = p[0] # translation err with p[1] self.sa.add(new_perm) def _create_default_object_permission(self, obj_type, obj, obj_perms, force=False): if obj_type not in ['user', 'user_group']: raise ValueError("obj_type must be on of 'user' or 'user_group'") def _get_group(perm_name): return '.'.join(perm_name.split('.')[:1]) defined_perms_groups = list(map( _get_group, (x.permission.permission_name for x in obj_perms))) log.debug('GOT ALREADY DEFINED:%s', obj_perms) if force: self._clear_object_perm(obj_perms) self.sa.commit() defined_perms_groups = [] # for every default permission that needs to be created, we check if # it's group is already defined, if it's not we create default perm for perm_name in Permission.DEFAULT_USER_PERMISSIONS: gr = _get_group(perm_name) if gr not in defined_perms_groups: log.debug('GR:%s not found, creating permission %s', gr, perm_name) if obj_type == 'user': new_perm = self._make_new_user_perm(obj, perm_name) self.sa.add(new_perm) if obj_type == 'user_group': new_perm = self._make_new_user_group_perm(obj, perm_name) self.sa.add(new_perm) def create_default_user_permissions(self, user, force=False): """ Creates only missing default permissions for user, if force is set it resets the default permissions for that user :param user: :param force: """ user = self._get_user(user) obj_perms = UserToPerm.query().filter(UserToPerm.user == user).all() return self._create_default_object_permission( 'user', user, obj_perms, force) def create_default_user_group_permissions(self, user_group, force=False): """ Creates only missing default permissions for user group, if force is set it resets the default permissions for that user group :param user_group: :param force: """ user_group = self._get_user_group(user_group) obj_perms = UserToPerm.query().filter(UserGroupToPerm.users_group == user_group).all() return self._create_default_object_permission( 'user_group', user_group, obj_perms, force) def update_application_permissions(self, form_result): if 'perm_user_id' in form_result: perm_user = User.get(safe_int(form_result['perm_user_id'])) else: # used mostly to do lookup for default user perm_user = User.get_by_username(form_result['perm_user_name']) try: # stage 1 set anonymous access if perm_user.username == User.DEFAULT_USER: perm_user.active = str2bool(form_result['anonymous']) self.sa.add(perm_user) # stage 2 reset defaults and set them from form data self._set_new_user_perms(perm_user, form_result, preserve=[ 'default_repo_perm', 'default_group_perm', 'default_user_group_perm', 'default_branch_perm', 'default_repo_group_create', 'default_user_group_create', 'default_repo_create_on_write', 'default_repo_create', 'default_fork_create', 'default_inherit_default_permissions']) self.sa.commit() except (DatabaseError,): log.error(traceback.format_exc()) self.sa.rollback() raise def update_user_permissions(self, form_result): if 'perm_user_id' in form_result: perm_user = User.get(safe_int(form_result['perm_user_id'])) else: # used mostly to do lookup for default user perm_user = User.get_by_username(form_result['perm_user_name']) try: # stage 2 reset defaults and set them from form data self._set_new_user_perms(perm_user, form_result, preserve=[ 'default_repo_perm', 'default_group_perm', 'default_user_group_perm', 'default_branch_perm', 'default_register', 'default_password_reset', 'default_extern_activate']) self.sa.commit() except (DatabaseError,): log.error(traceback.format_exc()) self.sa.rollback() raise def update_user_group_permissions(self, form_result): if 'perm_user_group_id' in form_result: perm_user_group = UserGroup.get(safe_int(form_result['perm_user_group_id'])) else: # used mostly to do lookup for default user perm_user_group = UserGroup.get_by_group_name(form_result['perm_user_group_name']) try: # stage 2 reset defaults and set them from form data self._set_new_user_group_perms(perm_user_group, form_result, preserve=[ 'default_repo_perm', 'default_group_perm', 'default_user_group_perm', 'default_branch_perm', 'default_register', 'default_password_reset', 'default_extern_activate']) self.sa.commit() except (DatabaseError,): log.error(traceback.format_exc()) self.sa.rollback() raise def update_object_permissions(self, form_result): if 'perm_user_id' in form_result: perm_user = User.get(safe_int(form_result['perm_user_id'])) else: # used mostly to do lookup for default user perm_user = User.get_by_username(form_result['perm_user_name']) try: # stage 2 reset defaults and set them from form data self._set_new_user_perms(perm_user, form_result, preserve=[ 'default_repo_group_create', 'default_user_group_create', 'default_repo_create_on_write', 'default_repo_create', 'default_fork_create', 'default_inherit_default_permissions', 'default_branch_perm', 'default_register', 'default_password_reset', 'default_extern_activate']) # overwrite default repo permissions if form_result['overwrite_default_repo']: _def_name = form_result['default_repo_perm'].split('repository.')[-1] _def = Permission.get_by_key('repository.' + _def_name) for r2p in self.sa.query(UserRepoToPerm)\ .filter(UserRepoToPerm.user == perm_user)\ .all(): # don't reset PRIVATE repositories if not r2p.repository.private: r2p.permission = _def self.sa.add(r2p) # overwrite default repo group permissions if form_result['overwrite_default_group']: _def_name = form_result['default_group_perm'].split('group.')[-1] _def = Permission.get_by_key('group.' + _def_name) for g2p in self.sa.query(UserRepoGroupToPerm)\ .filter(UserRepoGroupToPerm.user == perm_user)\ .all(): g2p.permission = _def self.sa.add(g2p) # overwrite default user group permissions if form_result['overwrite_default_user_group']: _def_name = form_result['default_user_group_perm'].split('usergroup.')[-1] # user groups _def = Permission.get_by_key('usergroup.' + _def_name) for g2p in self.sa.query(UserUserGroupToPerm)\ .filter(UserUserGroupToPerm.user == perm_user)\ .all(): g2p.permission = _def self.sa.add(g2p) # COMMIT self.sa.commit() except (DatabaseError,): log.exception('Failed to set default object permissions') self.sa.rollback() raise # because we've FORCED and update here, make sure we reset all permissions cache PermissionModel().trigger_permission_flush() def update_branch_permissions(self, form_result): if 'perm_user_id' in form_result: perm_user = User.get(safe_int(form_result['perm_user_id'])) else: # used mostly to do lookup for default user perm_user = User.get_by_username(form_result['perm_user_name']) try: # stage 2 reset defaults and set them from form data self._set_new_user_perms(perm_user, form_result, preserve=[ 'default_repo_perm', 'default_group_perm', 'default_user_group_perm', 'default_repo_group_create', 'default_user_group_create', 'default_repo_create_on_write', 'default_repo_create', 'default_fork_create', 'default_inherit_default_permissions', 'default_register', 'default_password_reset', 'default_extern_activate']) # overwrite default branch permissions if form_result['overwrite_default_branch']: _def_name = \ form_result['default_branch_perm'].split('branch.')[-1] _def = Permission.get_by_key('branch.' + _def_name) user_perms = UserToRepoBranchPermission.query()\ .join(UserToRepoBranchPermission.user_repo_to_perm)\ .filter(UserRepoToPerm.user == perm_user).all() for g2p in user_perms: g2p.permission = _def self.sa.add(g2p) # COMMIT self.sa.commit() except (DatabaseError,): log.exception('Failed to set default branch permissions') self.sa.rollback() raise def get_users_with_repo_write(self, db_repo): write_plus = ['repository.write', 'repository.admin'] default_user_id = User.get_default_user_id() user_write_permissions = collections.OrderedDict() # write or higher and DEFAULT user for inheritance for perm in db_repo.permissions(): if perm.permission in write_plus or perm.user_id == default_user_id: user_write_permissions[perm.user_id] = perm return user_write_permissions def get_user_groups_with_repo_write(self, db_repo): write_plus = ['repository.write', 'repository.admin'] user_group_write_permissions = collections.OrderedDict() # write or higher and DEFAULT user for inheritance for p in db_repo.permission_user_groups(): if p.permission in write_plus: user_group_write_permissions[p.users_group_id] = p return user_group_write_permissions def trigger_permission_flush(self, affected_user_ids=None): affected_user_ids = affected_user_ids or User.get_all_user_ids() events.trigger(events.UserPermissionsChange(affected_user_ids)) def flush_user_permission_caches(self, changes, affected_user_ids=None): affected_user_ids = affected_user_ids or [] for change in changes['added'] + changes['updated'] + changes['deleted']: if change['type'] == 'user': affected_user_ids.append(change['id']) if change['type'] == 'user_group': user_group = UserGroup.get(safe_int(change['id'])) if user_group: group_members_ids = [x.user_id for x in user_group.members] affected_user_ids.extend(group_members_ids) self.trigger_permission_flush(affected_user_ids) return affected_user_ids