##// END OF EJS Templates
fix(config): added default value to celery result and broker
fix(config): added default value to celery result and broker

File last commit:

r5095:aa627a5f default
r5485:633e0a3d default
Show More
user_group_api.py
905 lines | 30.1 KiB | text/x-python | PythonLexer
# Copyright (C) 2011-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
from rhodecode.api import (
jsonrpc_method, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
from rhodecode.api.utils import (
Optional, OAttr, store_update, has_superadmin_permission, get_origin,
get_user_or_error, get_user_group_or_error, get_perm_or_error)
from rhodecode.lib import audit_logger
from rhodecode.lib.auth import HasUserGroupPermissionAnyApi, HasPermissionAnyApi
from rhodecode.lib.exceptions import UserGroupAssignedException
from rhodecode.model.db import Session
from rhodecode.model.permission import PermissionModel
from rhodecode.model.scm import UserGroupList
from rhodecode.model.user_group import UserGroupModel
from rhodecode.model import validation_schema
from rhodecode.model.validation_schema.schemas import user_group_schema
log = logging.getLogger(__name__)
@jsonrpc_method()
def get_user_group(request, apiuser, usergroupid):
"""
Returns the data of an existing user group.
This command can only be run using an |authtoken| with admin rights to
the specified repository.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the user group from which to return data.
:type usergroupid: str or int
Example error output:
.. code-block:: bash
{
"error": null,
"id": <id>,
"result": {
"active": true,
"group_description": "group description",
"group_name": "group name",
"permissions": [
{
"name": "owner-name",
"origin": "owner",
"permission": "usergroup.admin",
"type": "user"
},
{
{
"name": "user name",
"origin": "permission",
"permission": "usergroup.admin",
"type": "user"
},
{
"name": "user group name",
"origin": "permission",
"permission": "usergroup.write",
"type": "user_group"
}
],
"permissions_summary": {
"repositories": {
"aa-root-level-repo-1": "repository.admin"
},
"repositories_groups": {}
},
"owner": "owner name",
"users": [],
"users_group_id": 2
}
}
"""
user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have at least read permission for this user group !
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError('user group `{}` does not exist'.format(
usergroupid))
permissions = []
for _user in user_group.permissions():
user_data = {
'name': _user.username,
'permission': _user.permission,
'origin': get_origin(_user),
'type': "user",
}
permissions.append(user_data)
for _user_group in user_group.permission_user_groups():
user_group_data = {
'name': _user_group.users_group_name,
'permission': _user_group.permission,
'origin': get_origin(_user_group),
'type': "user_group",
}
permissions.append(user_group_data)
data = user_group.get_api_data()
data["permissions"] = permissions
data["permissions_summary"] = UserGroupModel().get_perms_summary(
user_group.users_group_id)
return data
@jsonrpc_method()
def get_user_groups(request, apiuser):
"""
Lists all the existing user groups within RhodeCode.
This command can only be run using an |authtoken| with admin rights to
the specified repository.
This command takes the following options:
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
Example error output:
.. code-block:: bash
id : <id_given_in_input>
result : [<user_group_obj>,...]
error : null
"""
include_secrets = has_superadmin_permission(apiuser)
result = []
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
extras = {'user': apiuser}
for user_group in UserGroupList(UserGroupModel().get_all(),
perm_set=_perms, extra_kwargs=extras):
result.append(
user_group.get_api_data(include_secrets=include_secrets))
return result
@jsonrpc_method()
def create_user_group(
request, apiuser, group_name, description=Optional(''),
owner=Optional(OAttr('apiuser')), active=Optional(True),
sync=Optional(None)):
"""
Creates a new user group.
This command can only be run using an |authtoken| with admin rights to
the specified repository.
This command takes the following options:
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param group_name: Set the name of the new user group.
:type group_name: str
:param description: Give a description of the new user group.
:type description: str
:param owner: Set the owner of the new user group.
If not set, the owner is the |authtoken| user.
:type owner: Optional(str or int)
:param active: Set this group as active.
:type active: Optional(``True`` | ``False``)
:param sync: Set enabled or disabled the automatically sync from
external authentication types like ldap. If User Group will be named like
one from e.g ldap and sync flag is enabled members will be synced automatically.
Sync type when enabled via API is set to `manual_api`
:type sync: Optional(``True`` | ``False``)
Example output:
.. code-block:: bash
id : <id_given_in_input>
result: {
"msg": "created new user group `<groupname>`",
"user_group": <user_group_object>
}
error: null
Example error output:
.. code-block:: bash
id : <id_given_in_input>
result : null
error : {
"user group `<group name>` already exist"
or
"failed to create group `<group name>`"
}
"""
if not has_superadmin_permission(apiuser):
if not HasPermissionAnyApi('hg.usergroup.create.true')(user=apiuser):
raise JSONRPCForbidden()
if UserGroupModel().get_by_name(group_name):
raise JSONRPCError(f"user group `{group_name}` already exist")
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
active = Optional.extract(active)
description = Optional.extract(description)
sync = Optional.extract(sync)
# set the sync option based on group_data
group_data = None
if sync:
group_data = {
'extern_type': 'manual_api',
'extern_type_set_by': apiuser.username
}
schema = user_group_schema.UserGroupSchema().bind(
# user caller
user=apiuser)
try:
schema_data = schema.deserialize(dict(
user_group_name=group_name,
user_group_description=description,
user_group_owner=owner.username,
user_group_active=active,
))
except validation_schema.Invalid as err:
raise JSONRPCValidationError(colander_exc=err)
try:
user_group = UserGroupModel().create(
name=schema_data['user_group_name'],
description=schema_data['user_group_description'],
owner=owner,
active=schema_data['user_group_active'], group_data=group_data)
Session().flush()
creation_data = user_group.get_api_data()
audit_logger.store_api(
'user_group.create', action_data={'data': creation_data},
user=apiuser)
Session().commit()
affected_user_ids = [apiuser.user_id, owner.user_id]
PermissionModel().trigger_permission_flush(affected_user_ids)
return {
'msg': 'created new user group `%s`' % group_name,
'user_group': creation_data
}
except Exception:
log.exception("Error occurred during creation of user group")
raise JSONRPCError(f'failed to create group `{group_name}`')
@jsonrpc_method()
def update_user_group(request, apiuser, usergroupid, group_name=Optional(''),
description=Optional(''), owner=Optional(None),
active=Optional(True), sync=Optional(None)):
"""
Updates the specified `user group` with the details provided.
This command can only be run using an |authtoken| with admin rights to
the specified repository.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the id of the `user group` to update.
:type usergroupid: str or int
:param group_name: Set the new name the `user group`
:type group_name: str
:param description: Give a description for the `user group`
:type description: str
:param owner: Set the owner of the `user group`.
:type owner: Optional(str or int)
:param active: Set the group as active.
:type active: Optional(``True`` | ``False``)
:param sync: Set enabled or disabled the automatically sync from
external authentication types like ldap. If User Group will be named like
one from e.g ldap and sync flag is enabled members will be synced automatically.
Sync type when enabled via API is set to `manual_api`
:type sync: Optional(``True`` | ``False``)
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"msg": 'updated user group ID:<user group id> <user group name>',
"user_group": <user_group_object>
}
error : null
Example error output:
.. code-block:: bash
id : <id_given_in_input>
result : null
error : {
"failed to update user group `<user group name>`"
}
"""
user_group = get_user_group_or_error(usergroupid)
include_secrets = False
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{usergroupid}` does not exist')
else:
include_secrets = True
if not isinstance(owner, Optional):
owner = get_user_or_error(owner)
old_data = user_group.get_api_data()
updates = {}
store_update(updates, group_name, 'users_group_name')
store_update(updates, description, 'user_group_description')
store_update(updates, owner, 'user')
store_update(updates, active, 'users_group_active')
sync = Optional.extract(sync)
group_data = None
if sync is True:
group_data = {
'extern_type': 'manual_api',
'extern_type_set_by': apiuser.username
}
if sync is False:
group_data = user_group.group_data
if group_data and "extern_type" in group_data:
del group_data["extern_type"]
try:
UserGroupModel().update(user_group, updates, group_data=group_data)
audit_logger.store_api(
'user_group.edit', action_data={'old_data': old_data},
user=apiuser)
Session().commit()
return {
'msg': 'updated user group ID:{} {}'.format(
user_group.users_group_id, user_group.users_group_name),
'user_group': user_group.get_api_data(
include_secrets=include_secrets)
}
except Exception:
log.exception("Error occurred during update of user group")
raise JSONRPCError(
f'failed to update user group `{usergroupid}`')
@jsonrpc_method()
def delete_user_group(request, apiuser, usergroupid):
"""
Deletes the specified `user group`.
This command can only be run using an |authtoken| with admin rights to
the specified repository.
This command takes the following options:
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param usergroupid:
:type usergroupid: int
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"msg": "deleted user group ID:<user_group_id> <user_group_name>"
}
error : null
Example error output:
.. code-block:: bash
id : <id_given_in_input>
result : null
error : {
"failed to delete user group ID:<user_group_id> <user_group_name>"
or
"RepoGroup assigned to <repo_groups_list>"
}
"""
user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{usergroupid}` does not exist')
old_data = user_group.get_api_data()
try:
UserGroupModel().delete(user_group)
audit_logger.store_api(
'user_group.delete', action_data={'old_data': old_data},
user=apiuser)
Session().commit()
return {
'msg': 'deleted user group ID:{} {}'.format(
user_group.users_group_id, user_group.users_group_name),
'user_group': None
}
except UserGroupAssignedException as e:
log.exception("UserGroupAssigned error")
raise JSONRPCError(str(e))
except Exception:
log.exception("Error occurred during deletion of user group")
raise JSONRPCError(
'failed to delete user group ID:%s %s' %(
user_group.users_group_id, user_group.users_group_name))
@jsonrpc_method()
def add_user_to_user_group(request, apiuser, usergroupid, userid):
"""
Adds a user to a `user group`. If the user already exists in the group
this command will return false.
This command can only be run using an |authtoken| with admin rights to
the specified user group.
This command takes the following options:
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the name of the `user group` to which a
user will be added.
:type usergroupid: int
:param userid: Set the `user_id` of the user to add to the group.
:type userid: int
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"success": True|False # depends on if member is in group
"msg": "added member `<username>` to user group `<groupname>` |
User is already in that group"
}
error : null
Example error output:
.. code-block:: bash
id : <id_given_in_input>
result : null
error : {
"failed to add member to user group `<user_group_name>`"
}
"""
user = get_user_or_error(userid)
user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError('user group `{}` does not exist'.format(
usergroupid))
old_values = user_group.get_api_data()
try:
ugm = UserGroupModel().add_user_to_group(user_group, user)
success = True if ugm is not True else False
msg = 'added member `{}` to user group `{}`'.format(
user.username, user_group.users_group_name
)
msg = msg if success else 'User is already in that group'
if success:
user_data = user.get_api_data()
audit_logger.store_api(
'user_group.edit.member.add',
action_data={'user': user_data, 'old_data': old_values},
user=apiuser)
Session().commit()
return {
'success': success,
'msg': msg
}
except Exception:
log.exception("Error occurred during adding a member to user group")
raise JSONRPCError(
'failed to add member to user group `{}`'.format(
user_group.users_group_name,
)
)
@jsonrpc_method()
def remove_user_from_user_group(request, apiuser, usergroupid, userid):
"""
Removes a user from a user group.
* If the specified user is not in the group, this command will return
`false`.
This command can only be run using an |authtoken| with admin rights to
the specified user group.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Sets the user group name.
:type usergroupid: str or int
:param userid: The user you wish to remove from |RCE|.
:type userid: str or int
Example output:
.. code-block:: bash
id : <id_given_in_input>
result: {
"success": True|False, # depends on if member is in group
"msg": "removed member <username> from user group <groupname> |
User wasn't in group"
}
error: null
"""
user = get_user_or_error(userid)
user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{usergroupid}` does not exist')
old_values = user_group.get_api_data()
try:
success = UserGroupModel().remove_user_from_group(user_group, user)
msg = 'removed member `{}` from user group `{}`'.format(
user.username, user_group.users_group_name
)
msg = msg if success else "User wasn't in group"
if success:
user_data = user.get_api_data()
audit_logger.store_api(
'user_group.edit.member.delete',
action_data={'user': user_data, 'old_data': old_values},
user=apiuser)
Session().commit()
return {'success': success, 'msg': msg}
except Exception:
log.exception("Error occurred during removing an member from user group")
raise JSONRPCError(
'failed to remove member from user group `{}`'.format(
user_group.users_group_name,
)
)
@jsonrpc_method()
def grant_user_permission_to_user_group(
request, apiuser, usergroupid, userid, perm):
"""
Set permissions for a user in a user group.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the user group to edit permissions on.
:type usergroupid: str or int
:param userid: Set the user from whom you wish to set permissions.
:type userid: str
:param perm: (usergroup.(none|read|write|admin))
:type perm: str
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"msg": "Granted perm: `<perm_name>` for user: `<username>` in user group: `<user_group_name>`",
"success": true
}
error : null
"""
user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{usergroupid}` does not exist')
user = get_user_or_error(userid)
perm = get_perm_or_error(perm, prefix='usergroup.')
try:
changes = UserGroupModel().grant_user_permission(
user_group=user_group, user=user, perm=perm)
action_data = {
'added': changes['added'],
'updated': changes['updated'],
'deleted': changes['deleted'],
}
audit_logger.store_api(
'user_group.edit.permissions', action_data=action_data,
user=apiuser)
Session().commit()
PermissionModel().flush_user_permission_caches(changes)
return {
'msg':
'Granted perm: `{}` for user: `{}` in user group: `{}`'.format(
perm.permission_name, user.username,
user_group.users_group_name
),
'success': True
}
except Exception:
log.exception("Error occurred during editing permissions "
"for user in user group")
raise JSONRPCError(
'failed to edit permission for user: '
'`%s` in user group: `%s`' % (
userid, user_group.users_group_name))
@jsonrpc_method()
def revoke_user_permission_from_user_group(
request, apiuser, usergroupid, userid):
"""
Revoke a users permissions in a user group.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the user group from which to revoke the user
permissions.
:type: usergroupid: str or int
:param userid: Set the userid of the user whose permissions will be
revoked.
:type userid: str
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"msg": "Revoked perm for user: `<username>` in user group: `<user_group_name>`",
"success": true
}
error : null
"""
user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{usergroupid}` does not exist')
user = get_user_or_error(userid)
try:
changes = UserGroupModel().revoke_user_permission(
user_group=user_group, user=user)
action_data = {
'added': changes['added'],
'updated': changes['updated'],
'deleted': changes['deleted'],
}
audit_logger.store_api(
'user_group.edit.permissions', action_data=action_data,
user=apiuser)
Session().commit()
PermissionModel().flush_user_permission_caches(changes)
return {
'msg': 'Revoked perm for user: `{}` in user group: `{}`'.format(
user.username, user_group.users_group_name
),
'success': True
}
except Exception:
log.exception("Error occurred during editing permissions "
"for user in user group")
raise JSONRPCError(
'failed to edit permission for user: `%s` in user group: `%s`'
% (userid, user_group.users_group_name))
@jsonrpc_method()
def grant_user_group_permission_to_user_group(
request, apiuser, usergroupid, sourceusergroupid, perm):
"""
Give one user group permissions to another user group.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the user group on which to edit permissions.
:type usergroupid: str or int
:param sourceusergroupid: Set the source user group to which
access/permissions will be granted.
:type sourceusergroupid: str or int
:param perm: (usergroup.(none|read|write|admin))
:type perm: str
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"msg": "Granted perm: `<perm_name>` for user group: `<source_user_group_name>` in user group: `<user_group_name>`",
"success": true
}
error : null
"""
user_group = get_user_group_or_error(sourceusergroupid)
target_user_group = get_user_group_or_error(usergroupid)
perm = get_perm_or_error(perm, prefix='usergroup.')
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser,
user_group_name=target_user_group.users_group_name):
raise JSONRPCError(
f'to user group `{usergroupid}` does not exist')
# check if we have at least read permission for source user group !
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{sourceusergroupid}` does not exist')
try:
changes = UserGroupModel().grant_user_group_permission(
target_user_group=target_user_group,
user_group=user_group, perm=perm)
action_data = {
'added': changes['added'],
'updated': changes['updated'],
'deleted': changes['deleted'],
}
audit_logger.store_api(
'user_group.edit.permissions', action_data=action_data,
user=apiuser)
Session().commit()
PermissionModel().flush_user_permission_caches(changes)
return {
'msg': 'Granted perm: `%s` for user group: `%s` '
'in user group: `%s`' % (
perm.permission_name, user_group.users_group_name,
target_user_group.users_group_name
),
'success': True
}
except Exception:
log.exception("Error occurred during editing permissions "
"for user group in user group")
raise JSONRPCError(
'failed to edit permission for user group: `%s` in '
'user group: `%s`' % (
sourceusergroupid, target_user_group.users_group_name
)
)
@jsonrpc_method()
def revoke_user_group_permission_from_user_group(
request, apiuser, usergroupid, sourceusergroupid):
"""
Revoke the permissions that one user group has to another.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param usergroupid: Set the user group on which to edit permissions.
:type usergroupid: str or int
:param sourceusergroupid: Set the user group from which permissions
are revoked.
:type sourceusergroupid: str or int
Example output:
.. code-block:: bash
id : <id_given_in_input>
result : {
"msg": "Revoked perm for user group: `<user_group_name>` in user group: `<target_user_group_name>`",
"success": true
}
error : null
"""
user_group = get_user_group_or_error(sourceusergroupid)
target_user_group = get_user_group_or_error(usergroupid)
if not has_superadmin_permission(apiuser):
# check if we have admin permission for this user group !
_perms = ('usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser,
user_group_name=target_user_group.users_group_name):
raise JSONRPCError(
f'to user group `{usergroupid}` does not exist')
# check if we have at least read permission
# for the source user group !
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
if not HasUserGroupPermissionAnyApi(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError(
f'user group `{sourceusergroupid}` does not exist')
try:
changes = UserGroupModel().revoke_user_group_permission(
target_user_group=target_user_group, user_group=user_group)
action_data = {
'added': changes['added'],
'updated': changes['updated'],
'deleted': changes['deleted'],
}
audit_logger.store_api(
'user_group.edit.permissions', action_data=action_data,
user=apiuser)
Session().commit()
PermissionModel().flush_user_permission_caches(changes)
return {
'msg': 'Revoked perm for user group: '
'`%s` in user group: `%s`' % (
user_group.users_group_name,
target_user_group.users_group_name
),
'success': True
}
except Exception:
log.exception("Error occurred during editing permissions "
"for user group in user group")
raise JSONRPCError(
'failed to edit permission for user group: '
'`%s` in user group: `%s`' % (
sourceusergroupid, target_user_group.users_group_name
)
)