|
|
# Copyright (C) 2011-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import logging
|
|
|
|
|
|
from rhodecode.api import (
|
|
|
jsonrpc_method, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
|
|
|
from rhodecode.api.utils import (
|
|
|
Optional, OAttr, store_update, has_superadmin_permission, get_origin,
|
|
|
get_user_or_error, get_user_group_or_error, get_perm_or_error)
|
|
|
from rhodecode.lib import audit_logger
|
|
|
from rhodecode.lib.auth import HasUserGroupPermissionAnyApi, HasPermissionAnyApi
|
|
|
from rhodecode.lib.exceptions import UserGroupAssignedException
|
|
|
from rhodecode.model.db import Session
|
|
|
from rhodecode.model.permission import PermissionModel
|
|
|
from rhodecode.model.scm import UserGroupList
|
|
|
from rhodecode.model.user_group import UserGroupModel
|
|
|
from rhodecode.model import validation_schema
|
|
|
from rhodecode.model.validation_schema.schemas import user_group_schema
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def get_user_group(request, apiuser, usergroupid):
|
|
|
"""
|
|
|
Returns the data of an existing user group.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified repository.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the user group from which to return data.
|
|
|
:type usergroupid: str or int
|
|
|
|
|
|
Example error output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
{
|
|
|
"error": null,
|
|
|
"id": <id>,
|
|
|
"result": {
|
|
|
"active": true,
|
|
|
"group_description": "group description",
|
|
|
"group_name": "group name",
|
|
|
"permissions": [
|
|
|
{
|
|
|
"name": "owner-name",
|
|
|
"origin": "owner",
|
|
|
"permission": "usergroup.admin",
|
|
|
"type": "user"
|
|
|
},
|
|
|
{
|
|
|
{
|
|
|
"name": "user name",
|
|
|
"origin": "permission",
|
|
|
"permission": "usergroup.admin",
|
|
|
"type": "user"
|
|
|
},
|
|
|
{
|
|
|
"name": "user group name",
|
|
|
"origin": "permission",
|
|
|
"permission": "usergroup.write",
|
|
|
"type": "user_group"
|
|
|
}
|
|
|
],
|
|
|
"permissions_summary": {
|
|
|
"repositories": {
|
|
|
"aa-root-level-repo-1": "repository.admin"
|
|
|
},
|
|
|
"repositories_groups": {}
|
|
|
},
|
|
|
"owner": "owner name",
|
|
|
"users": [],
|
|
|
"users_group_id": 2
|
|
|
}
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have at least read permission for this user group !
|
|
|
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError('user group `{}` does not exist'.format(
|
|
|
usergroupid))
|
|
|
|
|
|
permissions = []
|
|
|
for _user in user_group.permissions():
|
|
|
user_data = {
|
|
|
'name': _user.username,
|
|
|
'permission': _user.permission,
|
|
|
'origin': get_origin(_user),
|
|
|
'type': "user",
|
|
|
}
|
|
|
permissions.append(user_data)
|
|
|
|
|
|
for _user_group in user_group.permission_user_groups():
|
|
|
user_group_data = {
|
|
|
'name': _user_group.users_group_name,
|
|
|
'permission': _user_group.permission,
|
|
|
'origin': get_origin(_user_group),
|
|
|
'type': "user_group",
|
|
|
}
|
|
|
permissions.append(user_group_data)
|
|
|
|
|
|
data = user_group.get_api_data()
|
|
|
data["permissions"] = permissions
|
|
|
data["permissions_summary"] = UserGroupModel().get_perms_summary(
|
|
|
user_group.users_group_id)
|
|
|
return data
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def get_user_groups(request, apiuser):
|
|
|
"""
|
|
|
Lists all the existing user groups within RhodeCode.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified repository.
|
|
|
|
|
|
This command takes the following options:
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
|
|
|
Example error output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : [<user_group_obj>,...]
|
|
|
error : null
|
|
|
"""
|
|
|
|
|
|
include_secrets = has_superadmin_permission(apiuser)
|
|
|
|
|
|
result = []
|
|
|
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
|
|
|
extras = {'user': apiuser}
|
|
|
for user_group in UserGroupList(UserGroupModel().get_all(),
|
|
|
perm_set=_perms, extra_kwargs=extras):
|
|
|
result.append(
|
|
|
user_group.get_api_data(include_secrets=include_secrets))
|
|
|
return result
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def create_user_group(
|
|
|
request, apiuser, group_name, description=Optional(''),
|
|
|
owner=Optional(OAttr('apiuser')), active=Optional(True),
|
|
|
sync=Optional(None)):
|
|
|
"""
|
|
|
Creates a new user group.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified repository.
|
|
|
|
|
|
This command takes the following options:
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param group_name: Set the name of the new user group.
|
|
|
:type group_name: str
|
|
|
:param description: Give a description of the new user group.
|
|
|
:type description: str
|
|
|
:param owner: Set the owner of the new user group.
|
|
|
If not set, the owner is the |authtoken| user.
|
|
|
:type owner: Optional(str or int)
|
|
|
:param active: Set this group as active.
|
|
|
:type active: Optional(``True`` | ``False``)
|
|
|
:param sync: Set enabled or disabled the automatically sync from
|
|
|
external authentication types like ldap. If User Group will be named like
|
|
|
one from e.g ldap and sync flag is enabled members will be synced automatically.
|
|
|
Sync type when enabled via API is set to `manual_api`
|
|
|
:type sync: Optional(``True`` | ``False``)
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result: {
|
|
|
"msg": "created new user group `<groupname>`",
|
|
|
"user_group": <user_group_object>
|
|
|
}
|
|
|
error: null
|
|
|
|
|
|
Example error output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : null
|
|
|
error : {
|
|
|
"user group `<group name>` already exist"
|
|
|
or
|
|
|
"failed to create group `<group name>`"
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
if not HasPermissionAnyApi('hg.usergroup.create.true')(user=apiuser):
|
|
|
raise JSONRPCForbidden()
|
|
|
|
|
|
if UserGroupModel().get_by_name(group_name):
|
|
|
raise JSONRPCError(f"user group `{group_name}` already exist")
|
|
|
|
|
|
if isinstance(owner, Optional):
|
|
|
owner = apiuser.user_id
|
|
|
|
|
|
owner = get_user_or_error(owner)
|
|
|
active = Optional.extract(active)
|
|
|
description = Optional.extract(description)
|
|
|
sync = Optional.extract(sync)
|
|
|
|
|
|
# set the sync option based on group_data
|
|
|
group_data = None
|
|
|
if sync:
|
|
|
group_data = {
|
|
|
'extern_type': 'manual_api',
|
|
|
'extern_type_set_by': apiuser.username
|
|
|
}
|
|
|
|
|
|
schema = user_group_schema.UserGroupSchema().bind(
|
|
|
# user caller
|
|
|
user=apiuser)
|
|
|
try:
|
|
|
schema_data = schema.deserialize(dict(
|
|
|
user_group_name=group_name,
|
|
|
user_group_description=description,
|
|
|
user_group_owner=owner.username,
|
|
|
user_group_active=active,
|
|
|
))
|
|
|
except validation_schema.Invalid as err:
|
|
|
raise JSONRPCValidationError(colander_exc=err)
|
|
|
|
|
|
try:
|
|
|
user_group = UserGroupModel().create(
|
|
|
name=schema_data['user_group_name'],
|
|
|
description=schema_data['user_group_description'],
|
|
|
owner=owner,
|
|
|
active=schema_data['user_group_active'], group_data=group_data)
|
|
|
Session().flush()
|
|
|
creation_data = user_group.get_api_data()
|
|
|
audit_logger.store_api(
|
|
|
'user_group.create', action_data={'data': creation_data},
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
|
|
|
affected_user_ids = [apiuser.user_id, owner.user_id]
|
|
|
PermissionModel().trigger_permission_flush(affected_user_ids)
|
|
|
|
|
|
return {
|
|
|
'msg': 'created new user group `%s`' % group_name,
|
|
|
'user_group': creation_data
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during creation of user group")
|
|
|
raise JSONRPCError(f'failed to create group `{group_name}`')
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def update_user_group(request, apiuser, usergroupid, group_name=Optional(''),
|
|
|
description=Optional(''), owner=Optional(None),
|
|
|
active=Optional(True), sync=Optional(None)):
|
|
|
"""
|
|
|
Updates the specified `user group` with the details provided.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified repository.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the id of the `user group` to update.
|
|
|
:type usergroupid: str or int
|
|
|
:param group_name: Set the new name the `user group`
|
|
|
:type group_name: str
|
|
|
:param description: Give a description for the `user group`
|
|
|
:type description: str
|
|
|
:param owner: Set the owner of the `user group`.
|
|
|
:type owner: Optional(str or int)
|
|
|
:param active: Set the group as active.
|
|
|
:type active: Optional(``True`` | ``False``)
|
|
|
:param sync: Set enabled or disabled the automatically sync from
|
|
|
external authentication types like ldap. If User Group will be named like
|
|
|
one from e.g ldap and sync flag is enabled members will be synced automatically.
|
|
|
Sync type when enabled via API is set to `manual_api`
|
|
|
:type sync: Optional(``True`` | ``False``)
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"msg": 'updated user group ID:<user group id> <user group name>',
|
|
|
"user_group": <user_group_object>
|
|
|
}
|
|
|
error : null
|
|
|
|
|
|
Example error output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : null
|
|
|
error : {
|
|
|
"failed to update user group `<user group name>`"
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
include_secrets = False
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{usergroupid}` does not exist')
|
|
|
else:
|
|
|
include_secrets = True
|
|
|
|
|
|
if not isinstance(owner, Optional):
|
|
|
owner = get_user_or_error(owner)
|
|
|
|
|
|
old_data = user_group.get_api_data()
|
|
|
updates = {}
|
|
|
store_update(updates, group_name, 'users_group_name')
|
|
|
store_update(updates, description, 'user_group_description')
|
|
|
store_update(updates, owner, 'user')
|
|
|
store_update(updates, active, 'users_group_active')
|
|
|
|
|
|
sync = Optional.extract(sync)
|
|
|
group_data = None
|
|
|
if sync is True:
|
|
|
group_data = {
|
|
|
'extern_type': 'manual_api',
|
|
|
'extern_type_set_by': apiuser.username
|
|
|
}
|
|
|
if sync is False:
|
|
|
group_data = user_group.group_data
|
|
|
if group_data and "extern_type" in group_data:
|
|
|
del group_data["extern_type"]
|
|
|
|
|
|
try:
|
|
|
UserGroupModel().update(user_group, updates, group_data=group_data)
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit', action_data={'old_data': old_data},
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
return {
|
|
|
'msg': 'updated user group ID:{} {}'.format(
|
|
|
user_group.users_group_id, user_group.users_group_name),
|
|
|
'user_group': user_group.get_api_data(
|
|
|
include_secrets=include_secrets)
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during update of user group")
|
|
|
raise JSONRPCError(
|
|
|
f'failed to update user group `{usergroupid}`')
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def delete_user_group(request, apiuser, usergroupid):
|
|
|
"""
|
|
|
Deletes the specified `user group`.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified repository.
|
|
|
|
|
|
This command takes the following options:
|
|
|
|
|
|
:param apiuser: filled automatically from apikey
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid:
|
|
|
:type usergroupid: int
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"msg": "deleted user group ID:<user_group_id> <user_group_name>"
|
|
|
}
|
|
|
error : null
|
|
|
|
|
|
Example error output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : null
|
|
|
error : {
|
|
|
"failed to delete user group ID:<user_group_id> <user_group_name>"
|
|
|
or
|
|
|
"RepoGroup assigned to <repo_groups_list>"
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{usergroupid}` does not exist')
|
|
|
|
|
|
old_data = user_group.get_api_data()
|
|
|
try:
|
|
|
UserGroupModel().delete(user_group)
|
|
|
audit_logger.store_api(
|
|
|
'user_group.delete', action_data={'old_data': old_data},
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
return {
|
|
|
'msg': 'deleted user group ID:{} {}'.format(
|
|
|
user_group.users_group_id, user_group.users_group_name),
|
|
|
'user_group': None
|
|
|
}
|
|
|
except UserGroupAssignedException as e:
|
|
|
log.exception("UserGroupAssigned error")
|
|
|
raise JSONRPCError(str(e))
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during deletion of user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to delete user group ID:%s %s' %(
|
|
|
user_group.users_group_id, user_group.users_group_name))
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def add_user_to_user_group(request, apiuser, usergroupid, userid):
|
|
|
"""
|
|
|
Adds a user to a `user group`. If the user already exists in the group
|
|
|
this command will return false.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified user group.
|
|
|
|
|
|
This command takes the following options:
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the name of the `user group` to which a
|
|
|
user will be added.
|
|
|
:type usergroupid: int
|
|
|
:param userid: Set the `user_id` of the user to add to the group.
|
|
|
:type userid: int
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"success": True|False # depends on if member is in group
|
|
|
"msg": "added member `<username>` to user group `<groupname>` |
|
|
|
User is already in that group"
|
|
|
|
|
|
}
|
|
|
error : null
|
|
|
|
|
|
Example error output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : null
|
|
|
error : {
|
|
|
"failed to add member to user group `<user_group_name>`"
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
user = get_user_or_error(userid)
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError('user group `{}` does not exist'.format(
|
|
|
usergroupid))
|
|
|
|
|
|
old_values = user_group.get_api_data()
|
|
|
try:
|
|
|
ugm = UserGroupModel().add_user_to_group(user_group, user)
|
|
|
success = True if ugm is not True else False
|
|
|
msg = 'added member `{}` to user group `{}`'.format(
|
|
|
user.username, user_group.users_group_name
|
|
|
)
|
|
|
msg = msg if success else 'User is already in that group'
|
|
|
if success:
|
|
|
user_data = user.get_api_data()
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit.member.add',
|
|
|
action_data={'user': user_data, 'old_data': old_values},
|
|
|
user=apiuser)
|
|
|
|
|
|
Session().commit()
|
|
|
|
|
|
return {
|
|
|
'success': success,
|
|
|
'msg': msg
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during adding a member to user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to add member to user group `{}`'.format(
|
|
|
user_group.users_group_name,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def remove_user_from_user_group(request, apiuser, usergroupid, userid):
|
|
|
"""
|
|
|
Removes a user from a user group.
|
|
|
|
|
|
* If the specified user is not in the group, this command will return
|
|
|
`false`.
|
|
|
|
|
|
This command can only be run using an |authtoken| with admin rights to
|
|
|
the specified user group.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Sets the user group name.
|
|
|
:type usergroupid: str or int
|
|
|
:param userid: The user you wish to remove from |RCE|.
|
|
|
:type userid: str or int
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result: {
|
|
|
"success": True|False, # depends on if member is in group
|
|
|
"msg": "removed member <username> from user group <groupname> |
|
|
|
User wasn't in group"
|
|
|
}
|
|
|
error: null
|
|
|
|
|
|
"""
|
|
|
|
|
|
user = get_user_or_error(userid)
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{usergroupid}` does not exist')
|
|
|
|
|
|
old_values = user_group.get_api_data()
|
|
|
try:
|
|
|
success = UserGroupModel().remove_user_from_group(user_group, user)
|
|
|
msg = 'removed member `{}` from user group `{}`'.format(
|
|
|
user.username, user_group.users_group_name
|
|
|
)
|
|
|
msg = msg if success else "User wasn't in group"
|
|
|
if success:
|
|
|
user_data = user.get_api_data()
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit.member.delete',
|
|
|
action_data={'user': user_data, 'old_data': old_values},
|
|
|
user=apiuser)
|
|
|
|
|
|
Session().commit()
|
|
|
return {'success': success, 'msg': msg}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during removing an member from user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to remove member from user group `{}`'.format(
|
|
|
user_group.users_group_name,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def grant_user_permission_to_user_group(
|
|
|
request, apiuser, usergroupid, userid, perm):
|
|
|
"""
|
|
|
Set permissions for a user in a user group.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the user group to edit permissions on.
|
|
|
:type usergroupid: str or int
|
|
|
:param userid: Set the user from whom you wish to set permissions.
|
|
|
:type userid: str
|
|
|
:param perm: (usergroup.(none|read|write|admin))
|
|
|
:type perm: str
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"msg": "Granted perm: `<perm_name>` for user: `<username>` in user group: `<user_group_name>`",
|
|
|
"success": true
|
|
|
}
|
|
|
error : null
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{usergroupid}` does not exist')
|
|
|
|
|
|
user = get_user_or_error(userid)
|
|
|
perm = get_perm_or_error(perm, prefix='usergroup.')
|
|
|
|
|
|
try:
|
|
|
changes = UserGroupModel().grant_user_permission(
|
|
|
user_group=user_group, user=user, perm=perm)
|
|
|
|
|
|
action_data = {
|
|
|
'added': changes['added'],
|
|
|
'updated': changes['updated'],
|
|
|
'deleted': changes['deleted'],
|
|
|
}
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit.permissions', action_data=action_data,
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
PermissionModel().flush_user_permission_caches(changes)
|
|
|
|
|
|
return {
|
|
|
'msg':
|
|
|
'Granted perm: `{}` for user: `{}` in user group: `{}`'.format(
|
|
|
perm.permission_name, user.username,
|
|
|
user_group.users_group_name
|
|
|
),
|
|
|
'success': True
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during editing permissions "
|
|
|
"for user in user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to edit permission for user: '
|
|
|
'`%s` in user group: `%s`' % (
|
|
|
userid, user_group.users_group_name))
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def revoke_user_permission_from_user_group(
|
|
|
request, apiuser, usergroupid, userid):
|
|
|
"""
|
|
|
Revoke a users permissions in a user group.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the user group from which to revoke the user
|
|
|
permissions.
|
|
|
:type: usergroupid: str or int
|
|
|
:param userid: Set the userid of the user whose permissions will be
|
|
|
revoked.
|
|
|
:type userid: str
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"msg": "Revoked perm for user: `<username>` in user group: `<user_group_name>`",
|
|
|
"success": true
|
|
|
}
|
|
|
error : null
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(usergroupid)
|
|
|
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{usergroupid}` does not exist')
|
|
|
|
|
|
user = get_user_or_error(userid)
|
|
|
|
|
|
try:
|
|
|
changes = UserGroupModel().revoke_user_permission(
|
|
|
user_group=user_group, user=user)
|
|
|
action_data = {
|
|
|
'added': changes['added'],
|
|
|
'updated': changes['updated'],
|
|
|
'deleted': changes['deleted'],
|
|
|
}
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit.permissions', action_data=action_data,
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
PermissionModel().flush_user_permission_caches(changes)
|
|
|
|
|
|
return {
|
|
|
'msg': 'Revoked perm for user: `{}` in user group: `{}`'.format(
|
|
|
user.username, user_group.users_group_name
|
|
|
),
|
|
|
'success': True
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during editing permissions "
|
|
|
"for user in user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to edit permission for user: `%s` in user group: `%s`'
|
|
|
% (userid, user_group.users_group_name))
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def grant_user_group_permission_to_user_group(
|
|
|
request, apiuser, usergroupid, sourceusergroupid, perm):
|
|
|
"""
|
|
|
Give one user group permissions to another user group.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the user group on which to edit permissions.
|
|
|
:type usergroupid: str or int
|
|
|
:param sourceusergroupid: Set the source user group to which
|
|
|
access/permissions will be granted.
|
|
|
:type sourceusergroupid: str or int
|
|
|
:param perm: (usergroup.(none|read|write|admin))
|
|
|
:type perm: str
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"msg": "Granted perm: `<perm_name>` for user group: `<source_user_group_name>` in user group: `<user_group_name>`",
|
|
|
"success": true
|
|
|
}
|
|
|
error : null
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(sourceusergroupid)
|
|
|
target_user_group = get_user_group_or_error(usergroupid)
|
|
|
perm = get_perm_or_error(perm, prefix='usergroup.')
|
|
|
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser,
|
|
|
user_group_name=target_user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'to user group `{usergroupid}` does not exist')
|
|
|
|
|
|
# check if we have at least read permission for source user group !
|
|
|
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{sourceusergroupid}` does not exist')
|
|
|
|
|
|
try:
|
|
|
changes = UserGroupModel().grant_user_group_permission(
|
|
|
target_user_group=target_user_group,
|
|
|
user_group=user_group, perm=perm)
|
|
|
|
|
|
action_data = {
|
|
|
'added': changes['added'],
|
|
|
'updated': changes['updated'],
|
|
|
'deleted': changes['deleted'],
|
|
|
}
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit.permissions', action_data=action_data,
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
PermissionModel().flush_user_permission_caches(changes)
|
|
|
|
|
|
return {
|
|
|
'msg': 'Granted perm: `%s` for user group: `%s` '
|
|
|
'in user group: `%s`' % (
|
|
|
perm.permission_name, user_group.users_group_name,
|
|
|
target_user_group.users_group_name
|
|
|
),
|
|
|
'success': True
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during editing permissions "
|
|
|
"for user group in user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to edit permission for user group: `%s` in '
|
|
|
'user group: `%s`' % (
|
|
|
sourceusergroupid, target_user_group.users_group_name
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
@jsonrpc_method()
|
|
|
def revoke_user_group_permission_from_user_group(
|
|
|
request, apiuser, usergroupid, sourceusergroupid):
|
|
|
"""
|
|
|
Revoke the permissions that one user group has to another.
|
|
|
|
|
|
:param apiuser: This is filled automatically from the |authtoken|.
|
|
|
:type apiuser: AuthUser
|
|
|
:param usergroupid: Set the user group on which to edit permissions.
|
|
|
:type usergroupid: str or int
|
|
|
:param sourceusergroupid: Set the user group from which permissions
|
|
|
are revoked.
|
|
|
:type sourceusergroupid: str or int
|
|
|
|
|
|
Example output:
|
|
|
|
|
|
.. code-block:: bash
|
|
|
|
|
|
id : <id_given_in_input>
|
|
|
result : {
|
|
|
"msg": "Revoked perm for user group: `<user_group_name>` in user group: `<target_user_group_name>`",
|
|
|
"success": true
|
|
|
}
|
|
|
error : null
|
|
|
"""
|
|
|
|
|
|
user_group = get_user_group_or_error(sourceusergroupid)
|
|
|
target_user_group = get_user_group_or_error(usergroupid)
|
|
|
|
|
|
if not has_superadmin_permission(apiuser):
|
|
|
# check if we have admin permission for this user group !
|
|
|
_perms = ('usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser,
|
|
|
user_group_name=target_user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'to user group `{usergroupid}` does not exist')
|
|
|
|
|
|
# check if we have at least read permission
|
|
|
# for the source user group !
|
|
|
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
|
|
|
if not HasUserGroupPermissionAnyApi(*_perms)(
|
|
|
user=apiuser, user_group_name=user_group.users_group_name):
|
|
|
raise JSONRPCError(
|
|
|
f'user group `{sourceusergroupid}` does not exist')
|
|
|
|
|
|
try:
|
|
|
changes = UserGroupModel().revoke_user_group_permission(
|
|
|
target_user_group=target_user_group, user_group=user_group)
|
|
|
action_data = {
|
|
|
'added': changes['added'],
|
|
|
'updated': changes['updated'],
|
|
|
'deleted': changes['deleted'],
|
|
|
}
|
|
|
audit_logger.store_api(
|
|
|
'user_group.edit.permissions', action_data=action_data,
|
|
|
user=apiuser)
|
|
|
Session().commit()
|
|
|
PermissionModel().flush_user_permission_caches(changes)
|
|
|
|
|
|
return {
|
|
|
'msg': 'Revoked perm for user group: '
|
|
|
'`%s` in user group: `%s`' % (
|
|
|
user_group.users_group_name,
|
|
|
target_user_group.users_group_name
|
|
|
),
|
|
|
'success': True
|
|
|
}
|
|
|
except Exception:
|
|
|
log.exception("Error occurred during editing permissions "
|
|
|
"for user group in user group")
|
|
|
raise JSONRPCError(
|
|
|
'failed to edit permission for user group: '
|
|
|
'`%s` in user group: `%s`' % (
|
|
|
sourceusergroupid, target_user_group.users_group_name
|
|
|
)
|
|
|
)
|
|
|
|