# -*- coding: utf-8 -*-
# Copyright (C) 2013-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
gist controller for RhodeCode
"""
import time
import logging
import traceback
import formencode
from formencode import htmlfill
from pylons import request, response, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.model.forms import GistForm
from rhodecode.model.gist import GistModel
from rhodecode.model.meta import Session
from rhodecode.model.db import Gist, User
from rhodecode.lib import auth
from rhodecode.lib import helpers as h
from rhodecode.lib.base import BaseController, render
from rhodecode.lib.auth import LoginRequired, NotAnonymous
from rhodecode.lib.utils import jsonify
from rhodecode.lib.utils2 import safe_str, safe_int, time_to_datetime
from rhodecode.lib.ext_json import json
from webob.exc import HTTPNotFound, HTTPForbidden
from sqlalchemy.sql.expression import or_
from rhodecode.lib.vcs.exceptions import VCSError, NodeNotChangedError
log = logging.getLogger(__name__)
class GistsController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
def __load_defaults(self, extra_values=None):
c.lifetime_values = [
(str(-1), _('forever')),
(str(5), _('5 minutes')),
(str(60), _('1 hour')),
(str(60 * 24), _('1 day')),
(str(60 * 24 * 30), _('1 month')),
]
if extra_values:
c.lifetime_values.append(extra_values)
c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
c.acl_options = [
(Gist.ACL_LEVEL_PRIVATE, _("Requires registered account")),
(Gist.ACL_LEVEL_PUBLIC, _("Can be accessed by anonymous users"))
]
@LoginRequired()
def index(self):
"""GET /admin/gists: All items in the collection"""
# url('gists')
not_default_user = c.rhodecode_user.username != User.DEFAULT_USER
c.show_private = request.GET.get('private') and not_default_user
c.show_public = request.GET.get('public') and not_default_user
c.show_all = request.GET.get('all') and c.rhodecode_user.admin
gists = _gists = Gist().query()\
.filter(or_(Gist.gist_expires == -1, Gist.gist_expires >= time.time()))\
.order_by(Gist.created_on.desc())
c.active = 'public'
# MY private
if c.show_private and not c.show_public:
gists = _gists.filter(Gist.gist_type == Gist.GIST_PRIVATE)\
.filter(Gist.gist_owner == c.rhodecode_user.user_id)
c.active = 'my_private'
# MY public
elif c.show_public and not c.show_private:
gists = _gists.filter(Gist.gist_type == Gist.GIST_PUBLIC)\
.filter(Gist.gist_owner == c.rhodecode_user.user_id)
c.active = 'my_public'
# MY public+private
elif c.show_private and c.show_public:
gists = _gists.filter(or_(Gist.gist_type == Gist.GIST_PUBLIC,
Gist.gist_type == Gist.GIST_PRIVATE))\
.filter(Gist.gist_owner == c.rhodecode_user.user_id)
c.active = 'my_all'
# Show all by super-admin
elif c.show_all:
c.active = 'all'
gists = _gists
# default show ALL public gists
if not c.show_public and not c.show_private and not c.show_all:
gists = _gists.filter(Gist.gist_type == Gist.GIST_PUBLIC)
c.active = 'public'
from rhodecode.lib.utils import PartialRenderer
_render = PartialRenderer('data_table/_dt_elements.html')
data = []
for gist in gists:
data.append({
'created_on': _render('gist_created', gist.created_on),
'created_on_raw': gist.created_on,
'type': _render('gist_type', gist.gist_type),
'access_id': _render('gist_access_id', gist.gist_access_id, gist.owner.full_contact),
'author': _render('gist_author', gist.owner.full_contact, gist.created_on, gist.gist_expires),
'author_raw': h.escape(gist.owner.full_contact),
'expires': _render('gist_expires', gist.gist_expires),
'description': _render('gist_description', gist.gist_description)
})
c.data = json.dumps(data)
return render('admin/gists/index.html')
@LoginRequired()
@NotAnonymous()
@auth.CSRFRequired()
def create(self):
"""POST /admin/gists: Create a new item"""
# url('gists')
self.__load_defaults()
gist_form = GistForm([x[0] for x in c.lifetime_values],
[x[0] for x in c.acl_options])()
try:
form_result = gist_form.to_python(dict(request.POST))
# TODO: multiple files support, from the form
filename = form_result['filename'] or Gist.DEFAULT_FILENAME
nodes = {
filename: {
'content': form_result['content'],
'lexer': form_result['mimetype'] # None is autodetect
}
}
_public = form_result['public']
gist_type = Gist.GIST_PUBLIC if _public else Gist.GIST_PRIVATE
gist_acl_level = form_result.get(
'acl_level', Gist.ACL_LEVEL_PRIVATE)
gist = GistModel().create(
description=form_result['description'],
owner=c.rhodecode_user.user_id,
gist_mapping=nodes,
gist_type=gist_type,
lifetime=form_result['lifetime'],
gist_id=form_result['gistid'],
gist_acl_level=gist_acl_level
)
Session().commit()
new_gist_id = gist.gist_access_id
except formencode.Invalid as errors:
defaults = errors.value
return formencode.htmlfill.render(
render('admin/gists/new.html'),
defaults=defaults,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8",
force_defaults=False
)
except Exception:
log.exception("Exception while trying to create a gist")
h.flash(_('Error occurred during gist creation'), category='error')
return redirect(url('new_gist'))
return redirect(url('gist', gist_id=new_gist_id))
@LoginRequired()
@NotAnonymous()
def new(self, format='html'):
"""GET /admin/gists/new: Form to create a new item"""
# url('new_gist')
self.__load_defaults()
return render('admin/gists/new.html')
@LoginRequired()
@NotAnonymous()
@auth.CSRFRequired()
def delete(self, gist_id):
"""DELETE /admin/gists/gist_id: Delete an existing item"""
# Forms posted to this method should contain a hidden field:
#
# Or using helpers:
# h.form(url('gist', gist_id=ID),
# method='delete')
# url('gist', gist_id=ID)
c.gist = Gist.get_or_404(gist_id)
owner = c.gist.gist_owner == c.rhodecode_user.user_id
if not (h.HasPermissionAny('hg.admin')() or owner):
raise HTTPForbidden()
GistModel().delete(c.gist)
Session().commit()
h.flash(_('Deleted gist %s') % c.gist.gist_access_id, category='success')
return redirect(url('gists'))
def _add_gist_to_context(self, gist_id):
c.gist = Gist.get_or_404(gist_id)
# Check if this gist is expired
if c.gist.gist_expires != -1:
if time.time() > c.gist.gist_expires:
log.error(
'Gist expired at %s', time_to_datetime(c.gist.gist_expires))
raise HTTPNotFound()
# check if this gist requires a login
is_default_user = c.rhodecode_user.username == User.DEFAULT_USER
if c.gist.acl_level == Gist.ACL_LEVEL_PRIVATE and is_default_user:
log.error("Anonymous user %s tried to access protected gist `%s`",
c.rhodecode_user, gist_id)
raise HTTPNotFound()
@LoginRequired()
def show(self, gist_id, revision='tip', format='html', f_path=None):
"""GET /admin/gists/gist_id: Show a specific item"""
# url('gist', gist_id=ID)
self._add_gist_to_context(gist_id)
c.render = not request.GET.get('no-render', False)
try:
c.file_last_commit, c.files = GistModel().get_gist_files(
gist_id, revision=revision)
except VCSError:
log.exception("Exception in gist show")
raise HTTPNotFound()
if format == 'raw':
content = '\n\n'.join([f.content for f in c.files if (f_path is None or f.path == f_path)])
response.content_type = 'text/plain'
return content
return render('admin/gists/show.html')
@LoginRequired()
@NotAnonymous()
@auth.CSRFRequired()
def edit(self, gist_id):
self._add_gist_to_context(gist_id)
owner = c.gist.gist_owner == c.rhodecode_user.user_id
if not (h.HasPermissionAny('hg.admin')() or owner):
raise HTTPForbidden()
rpost = request.POST
nodes = {}
_file_data = zip(rpost.getall('org_files'), rpost.getall('files'),
rpost.getall('mimetypes'), rpost.getall('contents'))
for org_filename, filename, mimetype, content in _file_data:
nodes[org_filename] = {
'org_filename': org_filename,
'filename': filename,
'content': content,
'lexer': mimetype,
}
try:
GistModel().update(
gist=c.gist,
description=rpost['description'],
owner=c.gist.owner,
gist_mapping=nodes,
gist_type=c.gist.gist_type,
lifetime=rpost['lifetime'],
gist_acl_level=rpost['acl_level']
)
Session().commit()
h.flash(_('Successfully updated gist content'), category='success')
except NodeNotChangedError:
# raised if nothing was changed in repo itself. We anyway then
# store only DB stuff for gist
Session().commit()
h.flash(_('Successfully updated gist data'), category='success')
except Exception:
log.exception("Exception in gist edit")
h.flash(_('Error occurred during update of gist %s') % gist_id,
category='error')
return redirect(url('gist', gist_id=gist_id))
@LoginRequired()
@NotAnonymous()
def edit_form(self, gist_id, format='html'):
"""GET /admin/gists/gist_id/edit: Form to edit an existing item"""
# url('edit_gist', gist_id=ID)
self._add_gist_to_context(gist_id)
owner = c.gist.gist_owner == c.rhodecode_user.user_id
if not (h.HasPermissionAny('hg.admin')() or owner):
raise HTTPForbidden()
try:
c.file_last_commit, c.files = GistModel().get_gist_files(gist_id)
except VCSError:
log.exception("Exception in gist edit")
raise HTTPNotFound()
if c.gist.gist_expires == -1:
expiry = _('never')
else:
# this cannot use timeago, since it's used in select2 as a value
expiry = h.age(h.time_to_datetime(c.gist.gist_expires))
self.__load_defaults(
extra_values=('0', _('%(expiry)s - current value') % {'expiry': expiry}))
return render('admin/gists/edit.html')
@LoginRequired()
@NotAnonymous()
@jsonify
def check_revision(self, gist_id):
c.gist = Gist.get_or_404(gist_id)
last_rev = c.gist.scm_instance().get_commit()
success = True
revision = request.GET.get('revision')
##TODO: maybe move this to model ?
if revision != last_rev.raw_id:
log.error('Last revision %s is different then submitted %s'
% (revision, last_rev))
# our gist has newer version than we
success = False
return {'success': success}