gist_api.py
254 lines
| 7.9 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2011-2023 RhodeCode GmbH | ||
r1 | # | ||
# This program is free software: you can redistribute it and/or modify | |||
# it under the terms of the GNU Affero General Public License, version 3 | |||
# (only), as published by the Free Software Foundation. | |||
# | |||
# This program is distributed in the hope that it will be useful, | |||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
# GNU General Public License for more details. | |||
# | |||
# You should have received a copy of the GNU Affero General Public License | |||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
# | |||
# This program is dual-licensed. If you wish to learn more about the | |||
# RhodeCode Enterprise Edition, including its added features, Support services, | |||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | |||
import logging | |||
import time | |||
from rhodecode.api import jsonrpc_method, JSONRPCError | |||
r523 | from rhodecode.api.exc import JSONRPCValidationError | ||
r1 | from rhodecode.api.utils import ( | ||
Optional, OAttr, get_gist_or_error, get_user_or_error, | |||
has_superadmin_permission) | |||
from rhodecode.model.db import Session, or_ | |||
from rhodecode.model.gist import Gist, GistModel | |||
log = logging.getLogger(__name__) | |||
@jsonrpc_method() | |||
def get_gist(request, apiuser, gistid, content=Optional(False)): | |||
""" | |||
Get the specified gist, based on the gist ID. | |||
:param apiuser: This is filled automatically from the |authtoken|. | |||
:type apiuser: AuthUser | |||
:param gistid: Set the id of the private or public gist | |||
:type gistid: str | |||
:param content: Return the gist content. Default is false. | |||
:type content: Optional(bool) | |||
""" | |||
gist = get_gist_or_error(gistid) | |||
content = Optional.extract(content) | |||
r5047 | |||
r1 | if not has_superadmin_permission(apiuser): | ||
if gist.gist_owner != apiuser.user_id: | |||
r5095 | raise JSONRPCError(f'gist `{gistid}` does not exist') | ||
r1 | data = gist.get_api_data() | ||
r5047 | |||
r1 | if content: | ||
from rhodecode.model.gist import GistModel | |||
rev, gist_files = GistModel().get_gist_files(gistid) | |||
r5092 | data['content'] = {x.path: x.str_content for x in gist_files} | ||
r1 | return data | ||
@jsonrpc_method() | |||
def get_gists(request, apiuser, userid=Optional(OAttr('apiuser'))): | |||
""" | |||
Get all gists for given user. If userid is empty returned gists | |||
are for user who called the api | |||
:param apiuser: This is filled automatically from the |authtoken|. | |||
:type apiuser: AuthUser | |||
:param userid: user to get gists for | |||
:type userid: Optional(str or int) | |||
""" | |||
if not has_superadmin_permission(apiuser): | |||
# make sure normal user does not pass someone else userid, | |||
# he is not allowed to do that | |||
if not isinstance(userid, Optional) and userid != apiuser.user_id: | |||
raise JSONRPCError( | |||
'userid is not the same as your user' | |||
) | |||
if isinstance(userid, Optional): | |||
user_id = apiuser.user_id | |||
else: | |||
user_id = get_user_or_error(userid).user_id | |||
gists = [] | |||
_gists = Gist().query() \ | |||
.filter(or_( | |||
Gist.gist_expires == -1, Gist.gist_expires >= time.time())) \ | |||
.filter(Gist.gist_owner == user_id) \ | |||
.order_by(Gist.created_on.desc()) | |||
for gist in _gists: | |||
gists.append(gist.get_api_data()) | |||
return gists | |||
@jsonrpc_method() | |||
def create_gist( | |||
r523 | request, apiuser, files, gistid=Optional(None), | ||
owner=Optional(OAttr('apiuser')), | |||
r1 | gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1), | ||
acl_level=Optional(Gist.ACL_LEVEL_PUBLIC), | |||
description=Optional('')): | |||
""" | |||
Creates a new Gist. | |||
:param apiuser: This is filled automatically from the |authtoken|. | |||
:type apiuser: AuthUser | |||
:param files: files to be added to the gist. The data structure has | |||
to match the following example:: | |||
r523 | {'filename1': {'content':'...'}, 'filename2': {'content':'...'}} | ||
r1 | |||
:type files: dict | |||
r523 | :param gistid: Set a custom id for the gist | ||
:type gistid: Optional(str) | |||
r1 | :param owner: Set the gist owner, defaults to api method caller | ||
:type owner: Optional(str or int) | |||
:param gist_type: type of gist ``public`` or ``private`` | |||
:type gist_type: Optional(str) | |||
:param lifetime: time in minutes of gist lifetime | |||
:type lifetime: Optional(int) | |||
:param acl_level: acl level for this gist, can be | |||
``acl_public`` or ``acl_private`` If the value is set to | |||
``acl_private`` only logged in users are able to access this gist. | |||
If not set it defaults to ``acl_public``. | |||
:type acl_level: Optional(str) | |||
:param description: gist description | |||
:type description: Optional(str) | |||
Example output: | |||
.. code-block:: bash | |||
id : <id_given_in_input> | |||
result : { | |||
"msg": "created new gist", | |||
"gist": {} | |||
} | |||
error : null | |||
Example error output: | |||
.. code-block:: bash | |||
id : <id_given_in_input> | |||
result : null | |||
error : { | |||
"failed to create gist" | |||
} | |||
""" | |||
r523 | from rhodecode.model import validation_schema | ||
from rhodecode.model.validation_schema.schemas import gist_schema | |||
if isinstance(owner, Optional): | |||
owner = apiuser.user_id | |||
owner = get_user_or_error(owner) | |||
lifetime = Optional.extract(lifetime) | |||
schema = gist_schema.GistSchema().bind( | |||
# bind the given values if it's allowed, however the deferred | |||
# validator will still validate it according to other rules | |||
lifetime_options=[lifetime]) | |||
r1 | |||
try: | |||
r523 | nodes = gist_schema.nodes_to_sequence( | ||
files, colander_node=schema.get('nodes')) | |||
schema_data = schema.deserialize(dict( | |||
gistid=Optional.extract(gistid), | |||
description=Optional.extract(description), | |||
gist_type=Optional.extract(gist_type), | |||
lifetime=lifetime, | |||
gist_acl_level=Optional.extract(acl_level), | |||
nodes=nodes | |||
)) | |||
r1 | |||
r523 | # convert to safer format with just KEYs so we sure no duplicates | ||
schema_data['nodes'] = gist_schema.sequence_to_nodes( | |||
schema_data['nodes'], colander_node=schema.get('nodes')) | |||
except validation_schema.Invalid as err: | |||
raise JSONRPCValidationError(colander_exc=err) | |||
r1 | |||
r523 | try: | ||
gist = GistModel().create( | |||
owner=owner, | |||
gist_id=schema_data['gistid'], | |||
description=schema_data['description'], | |||
gist_mapping=schema_data['nodes'], | |||
gist_type=schema_data['gist_type'], | |||
lifetime=schema_data['lifetime'], | |||
gist_acl_level=schema_data['gist_acl_level']) | |||
r1 | Session().commit() | ||
return { | |||
'msg': 'created new gist', | |||
'gist': gist.get_api_data() | |||
} | |||
except Exception: | |||
log.exception('Error occurred during creation of gist') | |||
raise JSONRPCError('failed to create gist') | |||
@jsonrpc_method() | |||
def delete_gist(request, apiuser, gistid): | |||
""" | |||
Deletes existing gist | |||
:param apiuser: filled automatically from apikey | |||
:type apiuser: AuthUser | |||
:param gistid: id of gist to delete | |||
:type gistid: str | |||
Example output: | |||
.. code-block:: bash | |||
id : <id_given_in_input> | |||
result : { | |||
"deleted gist ID: <gist_id>", | |||
"gist": null | |||
} | |||
error : null | |||
Example error output: | |||
.. code-block:: bash | |||
id : <id_given_in_input> | |||
result : null | |||
error : { | |||
"failed to delete gist ID:<gist_id>" | |||
} | |||
""" | |||
gist = get_gist_or_error(gistid) | |||
if not has_superadmin_permission(apiuser): | |||
if gist.gist_owner != apiuser.user_id: | |||
r5095 | raise JSONRPCError(f'gist `{gistid}` does not exist') | ||
r1 | |||
try: | |||
GistModel().delete(gist) | |||
Session().commit() | |||
return { | |||
r5095 | 'msg': f'deleted gist ID:{gist.gist_access_id}', | ||
r1 | 'gist': None | ||
} | |||
except Exception: | |||
log.exception('Error occured during gist deletion') | |||
raise JSONRPCError('failed to delete gist ID:%s' | |||
% (gist.gist_access_id,)) |