# Copyright (C) 2011-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import time
from rhodecode.api import jsonrpc_method, JSONRPCError
from rhodecode.api.exc import JSONRPCValidationError
from rhodecode.api.utils import (
Optional, OAttr, get_gist_or_error, get_user_or_error,
has_superadmin_permission)
from rhodecode.model.db import Session, or_
from rhodecode.model.gist import Gist, GistModel
log = logging.getLogger(__name__)
@jsonrpc_method()
def get_gist(request, apiuser, gistid, content=Optional(False)):
"""
Get the specified gist, based on the gist ID.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param gistid: Set the id of the private or public gist
:type gistid: str
:param content: Return the gist content. Default is false.
:type content: Optional(bool)
"""
gist = get_gist_or_error(gistid)
content = Optional.extract(content)
if not has_superadmin_permission(apiuser):
if gist.gist_owner != apiuser.user_id:
raise JSONRPCError(f'gist `{gistid}` does not exist')
data = gist.get_api_data()
if content:
from rhodecode.model.gist import GistModel
rev, gist_files = GistModel().get_gist_files(gistid)
data['content'] = {x.path: x.str_content for x in gist_files}
return data
@jsonrpc_method()
def get_gists(request, apiuser, userid=Optional(OAttr('apiuser'))):
"""
Get all gists for given user. If userid is empty returned gists
are for user who called the api
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param userid: user to get gists for
:type userid: Optional(str or int)
"""
if not has_superadmin_permission(apiuser):
# make sure normal user does not pass someone else userid,
# he is not allowed to do that
if not isinstance(userid, Optional) and userid != apiuser.user_id:
raise JSONRPCError(
'userid is not the same as your user'
)
if isinstance(userid, Optional):
user_id = apiuser.user_id
else:
user_id = get_user_or_error(userid).user_id
gists = []
_gists = Gist().query() \
.filter(or_(
Gist.gist_expires == -1, Gist.gist_expires >= time.time())) \
.filter(Gist.gist_owner == user_id) \
.order_by(Gist.created_on.desc())
for gist in _gists:
gists.append(gist.get_api_data())
return gists
@jsonrpc_method()
def create_gist(
request, apiuser, files, gistid=Optional(None),
owner=Optional(OAttr('apiuser')),
gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1),
acl_level=Optional(Gist.ACL_LEVEL_PUBLIC),
description=Optional('')):
"""
Creates a new Gist.
:param apiuser: This is filled automatically from the |authtoken|.
:type apiuser: AuthUser
:param files: files to be added to the gist. The data structure has
to match the following example::
{'filename1': {'content':'...'}, 'filename2': {'content':'...'}}
:type files: dict
:param gistid: Set a custom id for the gist
:type gistid: Optional(str)
:param owner: Set the gist owner, defaults to api method caller
:type owner: Optional(str or int)
:param gist_type: type of gist ``public`` or ``private``
:type gist_type: Optional(str)
:param lifetime: time in minutes of gist lifetime
:type lifetime: Optional(int)
:param acl_level: acl level for this gist, can be
``acl_public`` or ``acl_private`` If the value is set to
``acl_private`` only logged in users are able to access this gist.
If not set it defaults to ``acl_public``.
:type acl_level: Optional(str)
:param description: gist description
:type description: Optional(str)
Example output:
.. code-block:: bash
id :
result : {
"msg": "created new gist",
"gist": {}
}
error : null
Example error output:
.. code-block:: bash
id :
result : null
error : {
"failed to create gist"
}
"""
from rhodecode.model import validation_schema
from rhodecode.model.validation_schema.schemas import gist_schema
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
lifetime = Optional.extract(lifetime)
schema = gist_schema.GistSchema().bind(
# bind the given values if it's allowed, however the deferred
# validator will still validate it according to other rules
lifetime_options=[lifetime])
try:
nodes = gist_schema.nodes_to_sequence(
files, colander_node=schema.get('nodes'))
schema_data = schema.deserialize(dict(
gistid=Optional.extract(gistid),
description=Optional.extract(description),
gist_type=Optional.extract(gist_type),
lifetime=lifetime,
gist_acl_level=Optional.extract(acl_level),
nodes=nodes
))
# convert to safer format with just KEYs so we sure no duplicates
schema_data['nodes'] = gist_schema.sequence_to_nodes(
schema_data['nodes'], colander_node=schema.get('nodes'))
except validation_schema.Invalid as err:
raise JSONRPCValidationError(colander_exc=err)
try:
gist = GistModel().create(
owner=owner,
gist_id=schema_data['gistid'],
description=schema_data['description'],
gist_mapping=schema_data['nodes'],
gist_type=schema_data['gist_type'],
lifetime=schema_data['lifetime'],
gist_acl_level=schema_data['gist_acl_level'])
Session().commit()
return {
'msg': 'created new gist',
'gist': gist.get_api_data()
}
except Exception:
log.exception('Error occurred during creation of gist')
raise JSONRPCError('failed to create gist')
@jsonrpc_method()
def delete_gist(request, apiuser, gistid):
"""
Deletes existing gist
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param gistid: id of gist to delete
:type gistid: str
Example output:
.. code-block:: bash
id :
result : {
"deleted gist ID: ",
"gist": null
}
error : null
Example error output:
.. code-block:: bash
id :
result : null
error : {
"failed to delete gist ID:"
}
"""
gist = get_gist_or_error(gistid)
if not has_superadmin_permission(apiuser):
if gist.gist_owner != apiuser.user_id:
raise JSONRPCError(f'gist `{gistid}` does not exist')
try:
GistModel().delete(gist)
Session().commit()
return {
'msg': f'deleted gist ID:{gist.gist_access_id}',
'gist': None
}
except Exception:
log.exception('Error occured during gist deletion')
raise JSONRPCError('failed to delete gist ID:%s'
% (gist.gist_access_id,))