# Copyright (C) 2011-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging import time from rhodecode.api import jsonrpc_method, JSONRPCError from rhodecode.api.exc import JSONRPCValidationError from rhodecode.api.utils import ( Optional, OAttr, get_gist_or_error, get_user_or_error, has_superadmin_permission) from rhodecode.model.db import Session, or_ from rhodecode.model.gist import Gist, GistModel log = logging.getLogger(__name__) @jsonrpc_method() def get_gist(request, apiuser, gistid, content=Optional(False)): """ Get the specified gist, based on the gist ID. :param apiuser: This is filled automatically from the |authtoken|. :type apiuser: AuthUser :param gistid: Set the id of the private or public gist :type gistid: str :param content: Return the gist content. Default is false. :type content: Optional(bool) """ gist = get_gist_or_error(gistid) content = Optional.extract(content) if not has_superadmin_permission(apiuser): if gist.gist_owner != apiuser.user_id: raise JSONRPCError(f'gist `{gistid}` does not exist') data = gist.get_api_data() if content: from rhodecode.model.gist import GistModel rev, gist_files = GistModel().get_gist_files(gistid) data['content'] = {x.path: x.str_content for x in gist_files} return data @jsonrpc_method() def get_gists(request, apiuser, userid=Optional(OAttr('apiuser'))): """ Get all gists for given user. If userid is empty returned gists are for user who called the api :param apiuser: This is filled automatically from the |authtoken|. :type apiuser: AuthUser :param userid: user to get gists for :type userid: Optional(str or int) """ if not has_superadmin_permission(apiuser): # make sure normal user does not pass someone else userid, # he is not allowed to do that if not isinstance(userid, Optional) and userid != apiuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) if isinstance(userid, Optional): user_id = apiuser.user_id else: user_id = get_user_or_error(userid).user_id gists = [] _gists = Gist().query() \ .filter(or_( Gist.gist_expires == -1, Gist.gist_expires >= time.time())) \ .filter(Gist.gist_owner == user_id) \ .order_by(Gist.created_on.desc()) for gist in _gists: gists.append(gist.get_api_data()) return gists @jsonrpc_method() def create_gist( request, apiuser, files, gistid=Optional(None), owner=Optional(OAttr('apiuser')), gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1), acl_level=Optional(Gist.ACL_LEVEL_PUBLIC), description=Optional('')): """ Creates a new Gist. :param apiuser: This is filled automatically from the |authtoken|. :type apiuser: AuthUser :param files: files to be added to the gist. The data structure has to match the following example:: {'filename1': {'content':'...'}, 'filename2': {'content':'...'}} :type files: dict :param gistid: Set a custom id for the gist :type gistid: Optional(str) :param owner: Set the gist owner, defaults to api method caller :type owner: Optional(str or int) :param gist_type: type of gist ``public`` or ``private`` :type gist_type: Optional(str) :param lifetime: time in minutes of gist lifetime :type lifetime: Optional(int) :param acl_level: acl level for this gist, can be ``acl_public`` or ``acl_private`` If the value is set to ``acl_private`` only logged in users are able to access this gist. If not set it defaults to ``acl_public``. :type acl_level: Optional(str) :param description: gist description :type description: Optional(str) Example output: .. code-block:: bash id : <id_given_in_input> result : { "msg": "created new gist", "gist": {} } error : null Example error output: .. code-block:: bash id : <id_given_in_input> result : null error : { "failed to create gist" } """ from rhodecode.model import validation_schema from rhodecode.model.validation_schema.schemas import gist_schema if isinstance(owner, Optional): owner = apiuser.user_id owner = get_user_or_error(owner) lifetime = Optional.extract(lifetime) schema = gist_schema.GistSchema().bind( # bind the given values if it's allowed, however the deferred # validator will still validate it according to other rules lifetime_options=[lifetime]) try: nodes = gist_schema.nodes_to_sequence( files, colander_node=schema.get('nodes')) schema_data = schema.deserialize(dict( gistid=Optional.extract(gistid), description=Optional.extract(description), gist_type=Optional.extract(gist_type), lifetime=lifetime, gist_acl_level=Optional.extract(acl_level), nodes=nodes )) # convert to safer format with just KEYs so we sure no duplicates schema_data['nodes'] = gist_schema.sequence_to_nodes( schema_data['nodes'], colander_node=schema.get('nodes')) except validation_schema.Invalid as err: raise JSONRPCValidationError(colander_exc=err) try: gist = GistModel().create( owner=owner, gist_id=schema_data['gistid'], description=schema_data['description'], gist_mapping=schema_data['nodes'], gist_type=schema_data['gist_type'], lifetime=schema_data['lifetime'], gist_acl_level=schema_data['gist_acl_level']) Session().commit() return { 'msg': 'created new gist', 'gist': gist.get_api_data() } except Exception: log.exception('Error occurred during creation of gist') raise JSONRPCError('failed to create gist') @jsonrpc_method() def delete_gist(request, apiuser, gistid): """ Deletes existing gist :param apiuser: filled automatically from apikey :type apiuser: AuthUser :param gistid: id of gist to delete :type gistid: str Example output: .. code-block:: bash id : <id_given_in_input> result : { "deleted gist ID: <gist_id>", "gist": null } error : null Example error output: .. code-block:: bash id : <id_given_in_input> result : null error : { "failed to delete gist ID:<gist_id>" } """ gist = get_gist_or_error(gistid) if not has_superadmin_permission(apiuser): if gist.gist_owner != apiuser.user_id: raise JSONRPCError(f'gist `{gistid}` does not exist') try: GistModel().delete(gist) Session().commit() return { 'msg': f'deleted gist ID:{gist.gist_access_id}', 'gist': None } except Exception: log.exception('Error occured during gist deletion') raise JSONRPCError('failed to delete gist ID:%s' % (gist.gist_access_id,))