##// END OF EJS Templates
feat(configs): deprecared old hooks protocol and ssh wrapper....
feat(configs): deprecared old hooks protocol and ssh wrapper. New defaults are now set on v2 keys, so previous installation are automatically set to new keys. Fallback mode is still available.

File last commit:

r5186:76f12b03 default
r5496:cab50adf default
Show More
permission.py
609 lines | 25.0 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
permissions model for RhodeCode
"""
import collections
import logging
import traceback
from sqlalchemy.exc import DatabaseError
from rhodecode import events
from rhodecode.model import BaseModel
from rhodecode.model.db import (
User, Permission, UserToPerm, UserRepoToPerm, UserRepoGroupToPerm,
UserUserGroupToPerm, UserGroup, UserGroupToPerm, UserToRepoBranchPermission)
from rhodecode.lib.utils2 import str2bool, safe_int
log = logging.getLogger(__name__)
class PermissionModel(BaseModel):
"""
Permissions model for RhodeCode
"""
FORKING_DISABLED = 'hg.fork.none'
FORKING_ENABLED = 'hg.fork.repository'
cls = Permission
global_perms = {
'default_repo_create': None,
# special case for create repos on write access to group
'default_repo_create_on_write': None,
'default_repo_group_create': None,
'default_user_group_create': None,
'default_fork_create': None,
'default_inherit_default_permissions': None,
'default_register': None,
'default_password_reset': None,
'default_extern_activate': None,
# object permissions below
'default_repo_perm': None,
'default_group_perm': None,
'default_user_group_perm': None,
# branch
'default_branch_perm': None,
}
def set_global_permission_choices(self, c_obj, gettext_translator):
_ = gettext_translator
c_obj.repo_perms_choices = [
('repository.none', _('None'),),
('repository.read', _('Read'),),
('repository.write', _('Write'),),
('repository.admin', _('Admin'),)]
c_obj.group_perms_choices = [
('group.none', _('None'),),
('group.read', _('Read'),),
('group.write', _('Write'),),
('group.admin', _('Admin'),)]
c_obj.user_group_perms_choices = [
('usergroup.none', _('None'),),
('usergroup.read', _('Read'),),
('usergroup.write', _('Write'),),
('usergroup.admin', _('Admin'),)]
c_obj.branch_perms_choices = [
('branch.none', _('Protected/No Access'),),
('branch.merge', _('Web merge'),),
('branch.push', _('Push'),),
('branch.push_force', _('Force Push'),)]
c_obj.register_choices = [
('hg.register.none', _('Disabled')),
('hg.register.manual_activate', _('Allowed with manual account activation')),
('hg.register.auto_activate', _('Allowed with automatic account activation'))]
c_obj.password_reset_choices = [
('hg.password_reset.enabled', _('Allow password recovery')),
('hg.password_reset.hidden', _('Hide password recovery link')),
('hg.password_reset.disabled', _('Disable password recovery'))]
c_obj.extern_activate_choices = [
('hg.extern_activate.manual', _('Manual activation of external account')),
('hg.extern_activate.auto', _('Automatic activation of external account'))]
c_obj.repo_create_choices = [
('hg.create.none', _('Disabled')),
('hg.create.repository', _('Enabled'))]
c_obj.repo_create_on_write_choices = [
('hg.create.write_on_repogroup.false', _('Disabled')),
('hg.create.write_on_repogroup.true', _('Enabled'))]
c_obj.user_group_create_choices = [
('hg.usergroup.create.false', _('Disabled')),
('hg.usergroup.create.true', _('Enabled'))]
c_obj.repo_group_create_choices = [
('hg.repogroup.create.false', _('Disabled')),
('hg.repogroup.create.true', _('Enabled'))]
c_obj.fork_choices = [
(self.FORKING_DISABLED, _('Disabled')),
(self.FORKING_ENABLED, _('Enabled'))]
c_obj.inherit_default_permission_choices = [
('hg.inherit_default_perms.false', _('Disabled')),
('hg.inherit_default_perms.true', _('Enabled'))]
def get_default_perms(self, object_perms, suffix):
defaults = {}
for perm in object_perms:
# perms
if perm.permission.permission_name.startswith('repository.'):
defaults['default_repo_perm' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('group.'):
defaults['default_group_perm' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('usergroup.'):
defaults['default_user_group_perm' + suffix] = perm.permission.permission_name
# branch
if perm.permission.permission_name.startswith('branch.'):
defaults['default_branch_perm' + suffix] = perm.permission.permission_name
# creation of objects
if perm.permission.permission_name.startswith('hg.create.write_on_repogroup'):
defaults['default_repo_create_on_write' + suffix] = perm.permission.permission_name
elif perm.permission.permission_name.startswith('hg.create.'):
defaults['default_repo_create' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('hg.fork.'):
defaults['default_fork_create' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('hg.inherit_default_perms.'):
defaults['default_inherit_default_permissions' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('hg.repogroup.'):
defaults['default_repo_group_create' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('hg.usergroup.'):
defaults['default_user_group_create' + suffix] = perm.permission.permission_name
# registration and external account activation
if perm.permission.permission_name.startswith('hg.register.'):
defaults['default_register' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('hg.password_reset.'):
defaults['default_password_reset' + suffix] = perm.permission.permission_name
if perm.permission.permission_name.startswith('hg.extern_activate.'):
defaults['default_extern_activate' + suffix] = perm.permission.permission_name
return defaults
def _make_new_user_perm(self, user, perm_name):
log.debug('Creating new user permission:%s', perm_name)
new_perm = Permission.get_by_key(perm_name)
if not new_perm:
raise ValueError(f'permission with name {perm_name} not found')
new = UserToPerm()
new.user = user
new.permission = new_perm
return new
def _make_new_user_group_perm(self, user_group, perm_name):
log.debug('Creating new user group permission:%s', perm_name)
new_perm = Permission.get_by_key(perm_name)
if not new_perm:
raise ValueError(f'permission with name {perm_name} not found')
new = UserGroupToPerm()
new.users_group = user_group
new.permission = new_perm
return new
def _keep_perm(self, perm_name, keep_fields):
def get_pat(field_name):
return {
# global perms
'default_repo_create': 'hg.create.',
# special case for create repos on write access to group
'default_repo_create_on_write': 'hg.create.write_on_repogroup.',
'default_repo_group_create': 'hg.repogroup.create.',
'default_user_group_create': 'hg.usergroup.create.',
'default_fork_create': 'hg.fork.',
'default_inherit_default_permissions': 'hg.inherit_default_perms.',
# application perms
'default_register': 'hg.register.',
'default_password_reset': 'hg.password_reset.',
'default_extern_activate': 'hg.extern_activate.',
# object permissions below
'default_repo_perm': 'repository.',
'default_group_perm': 'group.',
'default_user_group_perm': 'usergroup.',
# branch
'default_branch_perm': 'branch.',
}[field_name]
for field in keep_fields:
pat = get_pat(field)
if perm_name.startswith(pat):
return True
return False
def _clear_object_perm(self, object_perms, preserve=None):
preserve = preserve or []
_deleted = []
for perm in object_perms:
perm_name = perm.permission.permission_name
if not self._keep_perm(perm_name, keep_fields=preserve):
_deleted.append(perm_name)
self.sa.delete(perm)
return _deleted
def _clear_user_perms(self, user_id, preserve=None):
perms = self.sa.query(UserToPerm)\
.filter(UserToPerm.user_id == user_id)\
.all()
return self._clear_object_perm(perms, preserve=preserve)
def _clear_user_group_perms(self, user_group_id, preserve=None):
perms = self.sa.query(UserGroupToPerm)\
.filter(UserGroupToPerm.users_group_id == user_group_id)\
.all()
return self._clear_object_perm(perms, preserve=preserve)
def _set_new_object_perms(self, obj_type, to_object, form_result, preserve=None):
# clear current entries, to make this function idempotent
# it will fix even if we define more permissions or permissions
# are somehow missing
preserve = preserve or []
_global_perms = self.global_perms.copy()
if obj_type not in ['user', 'user_group']:
raise ValueError("obj_type must be on of 'user' or 'user_group'")
global_perms = len(_global_perms)
default_user_perms = len(Permission.DEFAULT_USER_PERMISSIONS)
if global_perms != default_user_perms:
raise Exception(
'Inconsistent permissions definition. Got {} vs {}'.format(
global_perms, default_user_perms))
if obj_type == 'user':
self._clear_user_perms(to_object.user_id, preserve)
if obj_type == 'user_group':
self._clear_user_group_perms(to_object.users_group_id, preserve)
# now kill the keys that we want to preserve from the form.
for key in preserve:
del _global_perms[key]
for k in _global_perms.copy():
_global_perms[k] = form_result[k]
# at that stage we validate all are passed inside form_result
for _perm_key, perm_value in _global_perms.items():
if perm_value is None:
raise ValueError('Missing permission for {}'.format(_perm_key))
if obj_type == 'user':
p = self._make_new_user_perm(to_object, perm_value)
self.sa.add(p)
if obj_type == 'user_group':
p = self._make_new_user_group_perm(to_object, perm_value)
self.sa.add(p)
def _set_new_user_perms(self, user, form_result, preserve=None):
return self._set_new_object_perms(
'user', user, form_result, preserve)
def _set_new_user_group_perms(self, user_group, form_result, preserve=None):
return self._set_new_object_perms(
'user_group', user_group, form_result, preserve)
def set_new_user_perms(self, user, form_result):
# calculate what to preserve from what is given in form_result
preserve = set(self.global_perms.keys()).difference(set(form_result.keys()))
return self._set_new_user_perms(user, form_result, preserve)
def set_new_user_group_perms(self, user_group, form_result):
# calculate what to preserve from what is given in form_result
preserve = set(self.global_perms.keys()).difference(set(form_result.keys()))
return self._set_new_user_group_perms(user_group, form_result, preserve)
def create_permissions(self):
"""
Create permissions for whole system
"""
for p in Permission.PERMS:
if not Permission.get_by_key(p[0]):
new_perm = Permission()
new_perm.permission_name = p[0]
new_perm.permission_longname = p[0] # translation err with p[1]
self.sa.add(new_perm)
def _create_default_object_permission(self, obj_type, obj, obj_perms,
force=False):
if obj_type not in ['user', 'user_group']:
raise ValueError("obj_type must be on of 'user' or 'user_group'")
def _get_group(perm_name):
return '.'.join(perm_name.split('.')[:1])
defined_perms_groups = list(map(
_get_group, (x.permission.permission_name for x in obj_perms)))
log.debug('GOT ALREADY DEFINED:%s', obj_perms)
if force:
self._clear_object_perm(obj_perms)
self.sa.commit()
defined_perms_groups = []
# for every default permission that needs to be created, we check if
# it's group is already defined, if it's not we create default perm
for perm_name in Permission.DEFAULT_USER_PERMISSIONS:
gr = _get_group(perm_name)
if gr not in defined_perms_groups:
log.debug('GR:%s not found, creating permission %s',
gr, perm_name)
if obj_type == 'user':
new_perm = self._make_new_user_perm(obj, perm_name)
self.sa.add(new_perm)
if obj_type == 'user_group':
new_perm = self._make_new_user_group_perm(obj, perm_name)
self.sa.add(new_perm)
def create_default_user_permissions(self, user, force=False):
"""
Creates only missing default permissions for user, if force is set it
resets the default permissions for that user
:param user:
:param force:
"""
user = self._get_user(user)
obj_perms = UserToPerm.query().filter(UserToPerm.user == user).all()
return self._create_default_object_permission(
'user', user, obj_perms, force)
def create_default_user_group_permissions(self, user_group, force=False):
"""
Creates only missing default permissions for user group, if force is
set it resets the default permissions for that user group
:param user_group:
:param force:
"""
user_group = self._get_user_group(user_group)
obj_perms = UserToPerm.query().filter(UserGroupToPerm.users_group == user_group).all()
return self._create_default_object_permission(
'user_group', user_group, obj_perms, force)
def update_application_permissions(self, form_result):
if 'perm_user_id' in form_result:
perm_user = User.get(safe_int(form_result['perm_user_id']))
else:
# used mostly to do lookup for default user
perm_user = User.get_by_username(form_result['perm_user_name'])
try:
# stage 1 set anonymous access
if perm_user.username == User.DEFAULT_USER:
perm_user.active = str2bool(form_result['anonymous'])
self.sa.add(perm_user)
# stage 2 reset defaults and set them from form data
self._set_new_user_perms(perm_user, form_result, preserve=[
'default_repo_perm',
'default_group_perm',
'default_user_group_perm',
'default_branch_perm',
'default_repo_group_create',
'default_user_group_create',
'default_repo_create_on_write',
'default_repo_create',
'default_fork_create',
'default_inherit_default_permissions'])
self.sa.commit()
except (DatabaseError,):
log.error(traceback.format_exc())
self.sa.rollback()
raise
def update_user_permissions(self, form_result):
if 'perm_user_id' in form_result:
perm_user = User.get(safe_int(form_result['perm_user_id']))
else:
# used mostly to do lookup for default user
perm_user = User.get_by_username(form_result['perm_user_name'])
try:
# stage 2 reset defaults and set them from form data
self._set_new_user_perms(perm_user, form_result, preserve=[
'default_repo_perm',
'default_group_perm',
'default_user_group_perm',
'default_branch_perm',
'default_register',
'default_password_reset',
'default_extern_activate'])
self.sa.commit()
except (DatabaseError,):
log.error(traceback.format_exc())
self.sa.rollback()
raise
def update_user_group_permissions(self, form_result):
if 'perm_user_group_id' in form_result:
perm_user_group = UserGroup.get(safe_int(form_result['perm_user_group_id']))
else:
# used mostly to do lookup for default user
perm_user_group = UserGroup.get_by_group_name(form_result['perm_user_group_name'])
try:
# stage 2 reset defaults and set them from form data
self._set_new_user_group_perms(perm_user_group, form_result, preserve=[
'default_repo_perm',
'default_group_perm',
'default_user_group_perm',
'default_branch_perm',
'default_register',
'default_password_reset',
'default_extern_activate'])
self.sa.commit()
except (DatabaseError,):
log.error(traceback.format_exc())
self.sa.rollback()
raise
def update_object_permissions(self, form_result):
if 'perm_user_id' in form_result:
perm_user = User.get(safe_int(form_result['perm_user_id']))
else:
# used mostly to do lookup for default user
perm_user = User.get_by_username(form_result['perm_user_name'])
try:
# stage 2 reset defaults and set them from form data
self._set_new_user_perms(perm_user, form_result, preserve=[
'default_repo_group_create',
'default_user_group_create',
'default_repo_create_on_write',
'default_repo_create',
'default_fork_create',
'default_inherit_default_permissions',
'default_branch_perm',
'default_register',
'default_password_reset',
'default_extern_activate'])
# overwrite default repo permissions
if form_result['overwrite_default_repo']:
_def_name = form_result['default_repo_perm'].split('repository.')[-1]
_def = Permission.get_by_key('repository.' + _def_name)
for r2p in self.sa.query(UserRepoToPerm)\
.filter(UserRepoToPerm.user == perm_user)\
.all():
# don't reset PRIVATE repositories
if not r2p.repository.private:
r2p.permission = _def
self.sa.add(r2p)
# overwrite default repo group permissions
if form_result['overwrite_default_group']:
_def_name = form_result['default_group_perm'].split('group.')[-1]
_def = Permission.get_by_key('group.' + _def_name)
for g2p in self.sa.query(UserRepoGroupToPerm)\
.filter(UserRepoGroupToPerm.user == perm_user)\
.all():
g2p.permission = _def
self.sa.add(g2p)
# overwrite default user group permissions
if form_result['overwrite_default_user_group']:
_def_name = form_result['default_user_group_perm'].split('usergroup.')[-1]
# user groups
_def = Permission.get_by_key('usergroup.' + _def_name)
for g2p in self.sa.query(UserUserGroupToPerm)\
.filter(UserUserGroupToPerm.user == perm_user)\
.all():
g2p.permission = _def
self.sa.add(g2p)
# COMMIT
self.sa.commit()
except (DatabaseError,):
log.exception('Failed to set default object permissions')
self.sa.rollback()
raise
# because we've FORCED and update here, make sure we reset all permissions cache
PermissionModel().trigger_permission_flush()
def update_branch_permissions(self, form_result):
if 'perm_user_id' in form_result:
perm_user = User.get(safe_int(form_result['perm_user_id']))
else:
# used mostly to do lookup for default user
perm_user = User.get_by_username(form_result['perm_user_name'])
try:
# stage 2 reset defaults and set them from form data
self._set_new_user_perms(perm_user, form_result, preserve=[
'default_repo_perm',
'default_group_perm',
'default_user_group_perm',
'default_repo_group_create',
'default_user_group_create',
'default_repo_create_on_write',
'default_repo_create',
'default_fork_create',
'default_inherit_default_permissions',
'default_register',
'default_password_reset',
'default_extern_activate'])
# overwrite default branch permissions
if form_result['overwrite_default_branch']:
_def_name = \
form_result['default_branch_perm'].split('branch.')[-1]
_def = Permission.get_by_key('branch.' + _def_name)
user_perms = UserToRepoBranchPermission.query()\
.join(UserToRepoBranchPermission.user_repo_to_perm)\
.filter(UserRepoToPerm.user == perm_user).all()
for g2p in user_perms:
g2p.permission = _def
self.sa.add(g2p)
# COMMIT
self.sa.commit()
except (DatabaseError,):
log.exception('Failed to set default branch permissions')
self.sa.rollback()
raise
def get_users_with_repo_write(self, db_repo):
write_plus = ['repository.write', 'repository.admin']
default_user_id = User.get_default_user_id()
user_write_permissions = collections.OrderedDict()
# write or higher and DEFAULT user for inheritance
for perm in db_repo.permissions():
if perm.permission in write_plus or perm.user_id == default_user_id:
user_write_permissions[perm.user_id] = perm
return user_write_permissions
def get_user_groups_with_repo_write(self, db_repo):
write_plus = ['repository.write', 'repository.admin']
user_group_write_permissions = collections.OrderedDict()
# write or higher and DEFAULT user for inheritance
for p in db_repo.permission_user_groups():
if p.permission in write_plus:
user_group_write_permissions[p.users_group_id] = p
return user_group_write_permissions
def trigger_permission_flush(self, affected_user_ids=None):
affected_user_ids = affected_user_ids or User.get_all_user_ids()
events.trigger(events.UserPermissionsChange(affected_user_ids))
def flush_user_permission_caches(self, changes, affected_user_ids=None):
affected_user_ids = affected_user_ids or []
for change in changes['added'] + changes['updated'] + changes['deleted']:
if change['type'] == 'user':
affected_user_ids.append(change['id'])
if change['type'] == 'user_group':
user_group = UserGroup.get(safe_int(change['id']))
if user_group:
group_members_ids = [x.user_id for x in user_group.members]
affected_user_ids.extend(group_members_ids)
self.trigger_permission_flush(affected_user_ids)
return affected_user_ids