##// END OF EJS Templates
gists: fixes for tests and cleanup code that is now handled by the form deserializer
super-admin -
r4996:7fcb67da default
parent child Browse files
Show More
@@ -1,391 +1,391 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.lib import helpers as h
25 25 from rhodecode.model.db import User, Gist
26 26 from rhodecode.model.gist import GistModel
27 27 from rhodecode.model.meta import Session
28 28 from rhodecode.tests import (
29 29 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
30 30 TestController, assert_session_flash)
31 31
32 32
33 33 def route_path(name, params=None, **kwargs):
34 34 import urllib.request, urllib.parse, urllib.error
35 35 from rhodecode.apps._base import ADMIN_PREFIX
36 36
37 37 base_url = {
38 38 'gists_show': ADMIN_PREFIX + '/gists',
39 39 'gists_new': ADMIN_PREFIX + '/gists/new',
40 40 'gists_create': ADMIN_PREFIX + '/gists/create',
41 41 'gist_show': ADMIN_PREFIX + '/gists/{gist_id}',
42 42 'gist_delete': ADMIN_PREFIX + '/gists/{gist_id}/delete',
43 43 'gist_edit': ADMIN_PREFIX + '/gists/{gist_id}/edit',
44 44 'gist_edit_check_revision': ADMIN_PREFIX + '/gists/{gist_id}/edit/check_revision',
45 45 'gist_update': ADMIN_PREFIX + '/gists/{gist_id}/update',
46 46 'gist_show_rev': ADMIN_PREFIX + '/gists/{gist_id}/rev/{revision}',
47 47 'gist_show_formatted': ADMIN_PREFIX + '/gists/{gist_id}/rev/{revision}/{format}',
48 48 'gist_show_formatted_path': ADMIN_PREFIX + '/gists/{gist_id}/rev/{revision}/{format}/{f_path}',
49 49
50 50 }[name].format(**kwargs)
51 51
52 52 if params:
53 53 base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
54 54 return base_url
55 55
56 56
57 57 class GistUtility(object):
58 58
59 59 def __init__(self):
60 60 self._gist_ids = []
61 61
62 62 def __call__(
63 63 self, f_name, content='some gist', lifetime=-1,
64 64 description='gist-desc', gist_type='public',
65 65 acl_level=Gist.GIST_PUBLIC, owner=TEST_USER_ADMIN_LOGIN):
66 66 gist_mapping = {
67 67 f_name: {'content': content}
68 68 }
69 69 user = User.get_by_username(owner)
70 70 gist = GistModel().create(
71 71 description, owner=user, gist_mapping=gist_mapping,
72 72 gist_type=gist_type, lifetime=lifetime, gist_acl_level=acl_level)
73 73 Session().commit()
74 74 self._gist_ids.append(gist.gist_id)
75 75 return gist
76 76
77 77 def cleanup(self):
78 78 for gist_id in self._gist_ids:
79 79 gist = Gist.get(gist_id)
80 80 if gist:
81 81 Session().delete(gist)
82 82
83 83 Session().commit()
84 84
85 85
86 86 @pytest.fixture()
87 87 def create_gist(request):
88 88 gist_utility = GistUtility()
89 89 request.addfinalizer(gist_utility.cleanup)
90 90 return gist_utility
91 91
92 92
93 93 class TestGistsController(TestController):
94 94
95 95 def test_index_empty(self, create_gist):
96 96 self.log_user()
97 97 response = self.app.get(route_path('gists_show'))
98 98 response.mustcontain('data: [],')
99 99
100 100 def test_index(self, create_gist):
101 101 self.log_user()
102 102 g1 = create_gist('gist1')
103 103 g2 = create_gist('gist2', lifetime=1400)
104 104 g3 = create_gist('gist3', description='gist3-desc')
105 105 g4 = create_gist('gist4', gist_type='private').gist_access_id
106 106 response = self.app.get(route_path('gists_show'))
107 107
108 108 response.mustcontain(g1.gist_access_id)
109 109 response.mustcontain(g2.gist_access_id)
110 110 response.mustcontain(g3.gist_access_id)
111 111 response.mustcontain('gist3-desc')
112 112 response.mustcontain(no=[g4])
113 113
114 114 # Expiration information should be visible
115 115 expires_tag = '%s' % h.age_component(
116 116 h.time_to_utcdatetime(g2.gist_expires))
117 117 response.mustcontain(expires_tag.replace('"', '\\"'))
118 118
119 119 def test_index_private_gists(self, create_gist):
120 120 self.log_user()
121 121 gist = create_gist('gist5', gist_type='private')
122 122 response = self.app.get(route_path('gists_show', params=dict(private=1)))
123 123
124 124 # and privates
125 125 response.mustcontain(gist.gist_access_id)
126 126
127 127 def test_index_show_all(self, create_gist):
128 128 self.log_user()
129 129 create_gist('gist1')
130 130 create_gist('gist2', lifetime=1400)
131 131 create_gist('gist3', description='gist3-desc')
132 132 create_gist('gist4', gist_type='private')
133 133
134 134 response = self.app.get(route_path('gists_show', params=dict(all=1)))
135 135
136 136 assert len(GistModel.get_all()) == 4
137 137 # and privates
138 138 for gist in GistModel.get_all():
139 139 response.mustcontain(gist.gist_access_id)
140 140
141 141 def test_index_show_all_hidden_from_regular(self, create_gist):
142 142 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
143 143 create_gist('gist2', gist_type='private')
144 144 create_gist('gist3', gist_type='private')
145 145 create_gist('gist4', gist_type='private')
146 146
147 147 response = self.app.get(route_path('gists_show', params=dict(all=1)))
148 148
149 149 assert len(GistModel.get_all()) == 3
150 150 # since we don't have access to private in this view, we
151 151 # should see nothing
152 152 for gist in GistModel.get_all():
153 153 response.mustcontain(no=[gist.gist_access_id])
154 154
155 155 def test_create(self):
156 156 self.log_user()
157 157 response = self.app.post(
158 158 route_path('gists_create'),
159 159 params={'lifetime': -1,
160 160 'content': 'gist test',
161 161 'filename': 'foo',
162 162 'gist_type': 'public',
163 163 'gist_acl_level': Gist.ACL_LEVEL_PUBLIC,
164 164 'csrf_token': self.csrf_token},
165 165 status=302)
166 166 response = response.follow()
167 167 response.mustcontain('added file: foo')
168 168 response.mustcontain('gist test')
169 169
170 170 def test_create_with_path_with_dirs(self):
171 171 self.log_user()
172 172 response = self.app.post(
173 173 route_path('gists_create'),
174 174 params={'lifetime': -1,
175 175 'content': 'gist test',
176 176 'filename': '/home/foo',
177 177 'gist_type': 'public',
178 178 'gist_acl_level': Gist.ACL_LEVEL_PUBLIC,
179 179 'csrf_token': self.csrf_token},
180 180 status=200)
181 181 response.mustcontain('Filename /home/foo cannot be inside a directory')
182 182
183 183 def test_access_expired_gist(self, create_gist):
184 184 self.log_user()
185 185 gist = create_gist('never-see-me')
186 186 gist.gist_expires = 0 # 1970
187 187 Session().add(gist)
188 188 Session().commit()
189 189
190 190 self.app.get(route_path('gist_show', gist_id=gist.gist_access_id),
191 191 status=404)
192 192
193 193 def test_create_private(self):
194 194 self.log_user()
195 195 response = self.app.post(
196 196 route_path('gists_create'),
197 197 params={'lifetime': -1,
198 198 'content': 'private gist test',
199 199 'filename': 'private-foo',
200 200 'gist_type': 'private',
201 201 'gist_acl_level': Gist.ACL_LEVEL_PUBLIC,
202 202 'csrf_token': self.csrf_token},
203 203 status=302)
204 204 response = response.follow()
205 205 response.mustcontain('added file: private-foo<')
206 206 response.mustcontain('private gist test')
207 207 response.mustcontain('Private Gist')
208 208 # Make sure private gists are not indexed by robots
209 209 response.mustcontain(
210 210 '<meta name="robots" content="noindex, nofollow">')
211 211
212 212 def test_create_private_acl_private(self):
213 213 self.log_user()
214 214 response = self.app.post(
215 215 route_path('gists_create'),
216 216 params={'lifetime': -1,
217 217 'content': 'private gist test',
218 218 'filename': 'private-foo',
219 219 'gist_type': 'private',
220 220 'gist_acl_level': Gist.ACL_LEVEL_PRIVATE,
221 221 'csrf_token': self.csrf_token},
222 222 status=302)
223 223 response = response.follow()
224 224 response.mustcontain('added file: private-foo<')
225 225 response.mustcontain('private gist test')
226 226 response.mustcontain('Private Gist')
227 227 # Make sure private gists are not indexed by robots
228 228 response.mustcontain(
229 229 '<meta name="robots" content="noindex, nofollow">')
230 230
231 231 def test_create_with_description(self):
232 232 self.log_user()
233 233 response = self.app.post(
234 234 route_path('gists_create'),
235 235 params={'lifetime': -1,
236 236 'content': 'gist test',
237 237 'filename': 'foo-desc',
238 238 'description': 'gist-desc',
239 239 'gist_type': 'public',
240 240 'gist_acl_level': Gist.ACL_LEVEL_PUBLIC,
241 241 'csrf_token': self.csrf_token},
242 242 status=302)
243 243 response = response.follow()
244 244 response.mustcontain('added file: foo-desc')
245 245 response.mustcontain('gist test')
246 246 response.mustcontain('gist-desc')
247 247
248 248 def test_create_public_with_anonymous_access(self):
249 249 self.log_user()
250 250 params = {
251 251 'lifetime': -1,
252 252 'content': 'gist test',
253 253 'filename': 'foo-desc',
254 254 'description': 'gist-desc',
255 255 'gist_type': 'public',
256 256 'gist_acl_level': Gist.ACL_LEVEL_PUBLIC,
257 257 'csrf_token': self.csrf_token
258 258 }
259 259 response = self.app.post(
260 260 route_path('gists_create'), params=params, status=302)
261 261 self.logout_user()
262 262 response = response.follow()
263 263 response.mustcontain('added file: foo-desc')
264 264 response.mustcontain('gist test')
265 265 response.mustcontain('gist-desc')
266 266
267 267 def test_new(self):
268 268 self.log_user()
269 269 self.app.get(route_path('gists_new'))
270 270
271 271 def test_delete(self, create_gist):
272 272 self.log_user()
273 273 gist = create_gist('delete-me')
274 274 response = self.app.post(
275 275 route_path('gist_delete', gist_id=gist.gist_id),
276 276 params={'csrf_token': self.csrf_token})
277 277 assert_session_flash(response, 'Deleted gist %s' % gist.gist_id)
278 278
279 279 def test_delete_normal_user_his_gist(self, create_gist):
280 280 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
281 281 gist = create_gist('delete-me', owner=TEST_USER_REGULAR_LOGIN)
282 282
283 283 response = self.app.post(
284 284 route_path('gist_delete', gist_id=gist.gist_id),
285 285 params={'csrf_token': self.csrf_token})
286 286 assert_session_flash(response, 'Deleted gist %s' % gist.gist_id)
287 287
288 288 def test_delete_normal_user_not_his_own_gist(self, create_gist):
289 289 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
290 290 gist = create_gist('delete-me-2')
291 291
292 292 self.app.post(
293 293 route_path('gist_delete', gist_id=gist.gist_id),
294 294 params={'csrf_token': self.csrf_token}, status=404)
295 295
296 296 def test_show(self, create_gist):
297 297 gist = create_gist('gist-show-me')
298 298 response = self.app.get(route_path('gist_show', gist_id=gist.gist_access_id))
299 299
300 300 response.mustcontain('added file: gist-show-me<')
301 301
302 302 assert_response = response.assert_response()
303 303 assert_response.element_equals_to(
304 304 'div.rc-user span.user',
305 305 '<a href="/_profiles/test_admin">test_admin</a>')
306 306
307 307 response.mustcontain('gist-desc')
308 308
309 309 def test_show_without_hg(self, create_gist):
310 310 with mock.patch(
311 311 'rhodecode.lib.vcs.settings.ALIASES', ['git']):
312 312 gist = create_gist('gist-show-me-again')
313 313 self.app.get(
314 314 route_path('gist_show', gist_id=gist.gist_access_id), status=200)
315 315
316 316 def test_show_acl_private(self, create_gist):
317 317 gist = create_gist('gist-show-me-only-when-im-logged-in',
318 318 acl_level=Gist.ACL_LEVEL_PRIVATE)
319 319 self.app.get(
320 320 route_path('gist_show', gist_id=gist.gist_access_id), status=404)
321 321
322 322 # now we log-in we should see thi gist
323 323 self.log_user()
324 324 response = self.app.get(
325 325 route_path('gist_show', gist_id=gist.gist_access_id))
326 326 response.mustcontain('added file: gist-show-me-only-when-im-logged-in')
327 327
328 328 assert_response = response.assert_response()
329 329 assert_response.element_equals_to(
330 330 'div.rc-user span.user',
331 331 '<a href="/_profiles/test_admin">test_admin</a>')
332 332 response.mustcontain('gist-desc')
333 333
334 334 def test_show_as_raw(self, create_gist):
335 335 gist = create_gist('gist-show-me', content='GIST CONTENT')
336 336 response = self.app.get(
337 337 route_path('gist_show_formatted',
338 338 gist_id=gist.gist_access_id, revision='tip',
339 339 format='raw'))
340 assert response.body == 'GIST CONTENT'
340 assert response.text == 'GIST CONTENT'
341 341
342 342 def test_show_as_raw_individual_file(self, create_gist):
343 343 gist = create_gist('gist-show-me-raw', content='GIST BODY')
344 344 response = self.app.get(
345 345 route_path('gist_show_formatted_path',
346 346 gist_id=gist.gist_access_id, format='raw',
347 347 revision='tip', f_path='gist-show-me-raw'))
348 assert response.body == 'GIST BODY'
348 assert response.text == 'GIST BODY'
349 349
350 350 def test_edit_page(self, create_gist):
351 351 self.log_user()
352 352 gist = create_gist('gist-for-edit', content='GIST EDIT BODY')
353 353 response = self.app.get(route_path('gist_edit', gist_id=gist.gist_access_id))
354 354 response.mustcontain('GIST EDIT BODY')
355 355
356 356 def test_edit_page_non_logged_user(self, create_gist):
357 357 gist = create_gist('gist-for-edit', content='GIST EDIT BODY')
358 358 self.app.get(route_path('gist_edit', gist_id=gist.gist_access_id),
359 359 status=302)
360 360
361 361 def test_edit_normal_user_his_gist(self, create_gist):
362 362 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
363 363 gist = create_gist('gist-for-edit', owner=TEST_USER_REGULAR_LOGIN)
364 364 self.app.get(route_path('gist_edit', gist_id=gist.gist_access_id,
365 365 status=200))
366 366
367 367 def test_edit_normal_user_not_his_own_gist(self, create_gist):
368 368 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
369 369 gist = create_gist('delete-me')
370 370 self.app.get(route_path('gist_edit', gist_id=gist.gist_access_id),
371 371 status=404)
372 372
373 373 def test_user_first_name_is_escaped(self, user_util, create_gist):
374 374 xss_atack_string = '"><script>alert(\'First Name\')</script>'
375 375 xss_escaped_string = h.html_escape(h.escape(xss_atack_string))
376 376 password = 'test'
377 377 user = user_util.create_user(
378 378 firstname=xss_atack_string, password=password)
379 379 create_gist('gist', gist_type='public', owner=user.username)
380 380 response = self.app.get(route_path('gists_show'))
381 381 response.mustcontain(xss_escaped_string)
382 382
383 383 def test_user_last_name_is_escaped(self, user_util, create_gist):
384 384 xss_atack_string = '"><script>alert(\'Last Name\')</script>'
385 385 xss_escaped_string = h.html_escape(h.escape(xss_atack_string))
386 386 password = 'test'
387 387 user = user_util.create_user(
388 388 lastname=xss_atack_string, password=password)
389 389 create_gist('gist', gist_type='public', owner=user.username)
390 390 response = self.app.get(route_path('gists_show'))
391 391 response.mustcontain(xss_escaped_string)
@@ -1,386 +1,378 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2013-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import logging
23 23
24 24 import formencode
25 25 import formencode.htmlfill
26 26 import peppercorn
27 27
28 28 from pyramid.httpexceptions import HTTPNotFound, HTTPFound, HTTPBadRequest
29 29 from pyramid.renderers import render
30 30 from pyramid.response import Response
31 31
32 32 from rhodecode.apps._base import BaseAppView
33 33 from rhodecode.lib import helpers as h, ext_json
34 34 from rhodecode.lib.auth import LoginRequired, NotAnonymous, CSRFRequired
35 35 from rhodecode.lib.utils2 import time_to_datetime
36 36 from rhodecode.lib.ext_json import json
37 37 from rhodecode.lib.vcs.exceptions import VCSError, NodeNotChangedError
38 38 from rhodecode.model.gist import GistModel
39 39 from rhodecode.model.meta import Session
40 40 from rhodecode.model.db import Gist, User, or_
41 41 from rhodecode.model import validation_schema
42 42 from rhodecode.model.validation_schema.schemas import gist_schema
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class GistView(BaseAppView):
49 49
50 50 def load_default_context(self):
51 51 _ = self.request.translate
52 52 c = self._get_local_tmpl_context()
53 53 c.user = c.auth_user.get_instance()
54 54
55 55 c.lifetime_values = [
56 56 (-1, _('forever')),
57 57 (5, _('5 minutes')),
58 58 (60, _('1 hour')),
59 59 (60 * 24, _('1 day')),
60 60 (60 * 24 * 30, _('1 month')),
61 61 ]
62 62
63 63 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
64 64 c.acl_options = [
65 65 (Gist.ACL_LEVEL_PRIVATE, _("Requires registered account")),
66 66 (Gist.ACL_LEVEL_PUBLIC, _("Can be accessed by anonymous users"))
67 67 ]
68 68
69 69 return c
70 70
71 71 @LoginRequired()
72 72 def gist_show_all(self):
73 73 c = self.load_default_context()
74 74
75 75 not_default_user = self._rhodecode_user.username != User.DEFAULT_USER
76 76 c.show_private = self.request.GET.get('private') and not_default_user
77 77 c.show_public = self.request.GET.get('public') and not_default_user
78 78 c.show_all = self.request.GET.get('all') and self._rhodecode_user.admin
79 79
80 80 gists = _gists = Gist().query()\
81 81 .filter(or_(Gist.gist_expires == -1, Gist.gist_expires >= time.time()))\
82 82 .order_by(Gist.created_on.desc())
83 83
84 84 c.active = 'public'
85 85 # MY private
86 86 if c.show_private and not c.show_public:
87 87 gists = _gists.filter(Gist.gist_type == Gist.GIST_PRIVATE)\
88 88 .filter(Gist.gist_owner == self._rhodecode_user.user_id)
89 89 c.active = 'my_private'
90 90 # MY public
91 91 elif c.show_public and not c.show_private:
92 92 gists = _gists.filter(Gist.gist_type == Gist.GIST_PUBLIC)\
93 93 .filter(Gist.gist_owner == self._rhodecode_user.user_id)
94 94 c.active = 'my_public'
95 95 # MY public+private
96 96 elif c.show_private and c.show_public:
97 97 gists = _gists.filter(or_(Gist.gist_type == Gist.GIST_PUBLIC,
98 98 Gist.gist_type == Gist.GIST_PRIVATE))\
99 99 .filter(Gist.gist_owner == self._rhodecode_user.user_id)
100 100 c.active = 'my_all'
101 101 # Show all by super-admin
102 102 elif c.show_all:
103 103 c.active = 'all'
104 104 gists = _gists
105 105
106 106 # default show ALL public gists
107 107 if not c.show_public and not c.show_private and not c.show_all:
108 108 gists = _gists.filter(Gist.gist_type == Gist.GIST_PUBLIC)
109 109 c.active = 'public'
110 110
111 111 _render = self.request.get_partial_renderer(
112 112 'rhodecode:templates/data_table/_dt_elements.mako')
113 113
114 114 data = []
115 115
116 116 for gist in gists:
117 117 data.append({
118 118 'created_on': _render('gist_created', gist.created_on),
119 119 'created_on_raw': gist.created_on,
120 120 'type': _render('gist_type', gist.gist_type),
121 121 'access_id': _render('gist_access_id', gist.gist_access_id, gist.owner.full_contact),
122 122 'author': _render('gist_author', gist.owner.full_contact, gist.created_on, gist.gist_expires),
123 123 'author_raw': h.escape(gist.owner.full_contact),
124 124 'expires': _render('gist_expires', gist.gist_expires),
125 125 'description': _render('gist_description', gist.gist_description)
126 126 })
127 127 c.data = ext_json.str_json(data)
128 128
129 129 return self._get_template_context(c)
130 130
131 131 @LoginRequired()
132 132 @NotAnonymous()
133 133 def gist_new(self):
134 134 c = self.load_default_context()
135 135 return self._get_template_context(c)
136 136
137 137 @LoginRequired()
138 138 @NotAnonymous()
139 139 @CSRFRequired()
140 140 def gist_create(self):
141 141 _ = self.request.translate
142 142 c = self.load_default_context()
143 143
144 144 data = dict(self.request.POST)
145 data['filename'] = data.get('filename') or Gist.DEFAULT_FILENAME
145
146 filename = data.pop('filename', '') or Gist.DEFAULT_FILENAME
146 147
147 148 data['nodes'] = [{
148 'filename': data['filename'],
149 'content': data.get('content'),
150 'mimetype': data.get('mimetype') # None is autodetect
149 'filename': filename,
150 'content': data.pop('content', ''),
151 'mimetype': data.pop('mimetype', None) # None is autodetect
151 152 }]
152 gist_type = {
153 'public': Gist.GIST_PUBLIC,
154 'private': Gist.GIST_PRIVATE
155 }.get(data.get('gist_type')) or Gist.GIST_PRIVATE
156
157 data['gist_type'] = gist_type
158
159 data['gist_acl_level'] = (
160 data.get('gist_acl_level') or Gist.ACL_LEVEL_PRIVATE)
161 153
162 154 schema = gist_schema.GistSchema().bind(
163 155 lifetime_options=[x[0] for x in c.lifetime_values])
164 156
165 157 try:
166 158
167 159 schema_data = schema.deserialize(data)
160
168 161 # convert to safer format with just KEYs so we sure no duplicates
169 schema_data['nodes'] = gist_schema.sequence_to_nodes(
170 schema_data['nodes'])
162 schema_data['nodes'] = gist_schema.sequence_to_nodes(schema_data['nodes'])
171 163
172 164 gist = GistModel().create(
173 165 gist_id=schema_data['gistid'], # custom access id not real ID
174 166 description=schema_data['description'],
175 167 owner=self._rhodecode_user.user_id,
176 168 gist_mapping=schema_data['nodes'],
177 169 gist_type=schema_data['gist_type'],
178 170 lifetime=schema_data['lifetime'],
179 171 gist_acl_level=schema_data['gist_acl_level']
180 172 )
181 173 Session().commit()
182 174 new_gist_id = gist.gist_access_id
183 175 except validation_schema.Invalid as errors:
184 176 defaults = data
185 177 errors = errors.asdict()
186 178
187 179 if 'nodes.0.content' in errors:
188 180 errors['content'] = errors['nodes.0.content']
189 181 del errors['nodes.0.content']
190 182 if 'nodes.0.filename' in errors:
191 183 errors['filename'] = errors['nodes.0.filename']
192 184 del errors['nodes.0.filename']
193 185
194 186 data = render('rhodecode:templates/admin/gists/gist_new.mako',
195 187 self._get_template_context(c), self.request)
196 188 html = formencode.htmlfill.render(
197 189 data,
198 190 defaults=defaults,
199 191 errors=errors,
200 192 prefix_error=False,
201 193 encoding="UTF-8",
202 194 force_defaults=False
203 195 )
204 196 return Response(html)
205 197
206 198 except Exception:
207 199 log.exception("Exception while trying to create a gist")
208 200 h.flash(_('Error occurred during gist creation'), category='error')
209 201 raise HTTPFound(h.route_url('gists_new'))
210 202 raise HTTPFound(h.route_url('gist_show', gist_id=new_gist_id))
211 203
212 204 @LoginRequired()
213 205 @NotAnonymous()
214 206 @CSRFRequired()
215 207 def gist_delete(self):
216 208 _ = self.request.translate
217 209 gist_id = self.request.matchdict['gist_id']
218 210
219 211 c = self.load_default_context()
220 212 c.gist = Gist.get_or_404(gist_id)
221 213
222 214 owner = c.gist.gist_owner == self._rhodecode_user.user_id
223 215 if not (h.HasPermissionAny('hg.admin')() or owner):
224 216 log.warning('Deletion of Gist was forbidden '
225 217 'by unauthorized user: `%s`', self._rhodecode_user)
226 218 raise HTTPNotFound()
227 219
228 220 GistModel().delete(c.gist)
229 221 Session().commit()
230 222 h.flash(_('Deleted gist %s') % c.gist.gist_access_id, category='success')
231 223
232 224 raise HTTPFound(h.route_url('gists_show'))
233 225
234 226 def _get_gist(self, gist_id):
235 227
236 228 gist = Gist.get_or_404(gist_id)
237 229
238 230 # Check if this gist is expired
239 231 if gist.gist_expires != -1:
240 232 if time.time() > gist.gist_expires:
241 233 log.error(
242 234 'Gist expired at %s', time_to_datetime(gist.gist_expires))
243 235 raise HTTPNotFound()
244 236
245 237 # check if this gist requires a login
246 238 is_default_user = self._rhodecode_user.username == User.DEFAULT_USER
247 239 if gist.acl_level == Gist.ACL_LEVEL_PRIVATE and is_default_user:
248 240 log.error("Anonymous user %s tried to access protected gist `%s`",
249 241 self._rhodecode_user, gist_id)
250 242 raise HTTPNotFound()
251 243 return gist
252 244
253 245 @LoginRequired()
254 246 def gist_show(self):
255 247 gist_id = self.request.matchdict['gist_id']
256 248
257 249 # TODO(marcink): expose those via matching dict
258 250 revision = self.request.matchdict.get('revision', 'tip')
259 251 f_path = self.request.matchdict.get('f_path', None)
260 252 return_format = self.request.matchdict.get('format')
261 253
262 254 c = self.load_default_context()
263 255 c.gist = self._get_gist(gist_id)
264 256 c.render = not self.request.GET.get('no-render', False)
265 257
266 258 try:
267 259 c.file_last_commit, c.files = GistModel().get_gist_files(
268 260 gist_id, revision=revision)
269 261 except VCSError:
270 262 log.exception("Exception in gist show")
271 263 raise HTTPNotFound()
272 264
273 265 if return_format == 'raw':
274 266 content = '\n\n'.join([f.content for f in c.files
275 267 if (f_path is None or f.path == f_path)])
276 268 response = Response(content)
277 269 response.content_type = 'text/plain'
278 270 return response
279 271 elif return_format:
280 272 raise HTTPBadRequest()
281 273
282 274 return self._get_template_context(c)
283 275
284 276 @LoginRequired()
285 277 @NotAnonymous()
286 278 def gist_edit(self):
287 279 _ = self.request.translate
288 280 gist_id = self.request.matchdict['gist_id']
289 281 c = self.load_default_context()
290 282 c.gist = self._get_gist(gist_id)
291 283
292 284 owner = c.gist.gist_owner == self._rhodecode_user.user_id
293 285 if not (h.HasPermissionAny('hg.admin')() or owner):
294 286 raise HTTPNotFound()
295 287
296 288 try:
297 289 c.file_last_commit, c.files = GistModel().get_gist_files(gist_id)
298 290 except VCSError:
299 291 log.exception("Exception in gist edit")
300 292 raise HTTPNotFound()
301 293
302 294 if c.gist.gist_expires == -1:
303 295 expiry = _('never')
304 296 else:
305 297 # this cannot use timeago, since it's used in select2 as a value
306 298 expiry = h.age(h.time_to_datetime(c.gist.gist_expires))
307 299
308 300 c.lifetime_values.append(
309 301 (0, _('%(expiry)s - current value') % {'expiry': _(expiry)})
310 302 )
311 303
312 304 return self._get_template_context(c)
313 305
314 306 @LoginRequired()
315 307 @NotAnonymous()
316 308 @CSRFRequired()
317 309 def gist_update(self):
318 310 _ = self.request.translate
319 311 gist_id = self.request.matchdict['gist_id']
320 312 c = self.load_default_context()
321 313 c.gist = self._get_gist(gist_id)
322 314
323 315 owner = c.gist.gist_owner == self._rhodecode_user.user_id
324 316 if not (h.HasPermissionAny('hg.admin')() or owner):
325 317 raise HTTPNotFound()
326 318
327 319 data = peppercorn.parse(self.request.POST.items())
328 320
329 321 schema = gist_schema.GistSchema()
330 322 schema = schema.bind(
331 323 # '0' is special value to leave lifetime untouched
332 324 lifetime_options=[x[0] for x in c.lifetime_values] + [0],
333 325 )
334 326
335 327 try:
336 328 schema_data = schema.deserialize(data)
337 329 # convert to safer format with just KEYs so we sure no duplicates
338 330 schema_data['nodes'] = gist_schema.sequence_to_nodes(
339 331 schema_data['nodes'])
340 332
341 333 GistModel().update(
342 334 gist=c.gist,
343 335 description=schema_data['description'],
344 336 owner=c.gist.owner,
345 337 gist_mapping=schema_data['nodes'],
346 338 lifetime=schema_data['lifetime'],
347 339 gist_acl_level=schema_data['gist_acl_level']
348 340 )
349 341
350 342 Session().commit()
351 343 h.flash(_('Successfully updated gist content'), category='success')
352 344 except NodeNotChangedError:
353 345 # raised if nothing was changed in repo itself. We anyway then
354 346 # store only DB stuff for gist
355 347 Session().commit()
356 348 h.flash(_('Successfully updated gist data'), category='success')
357 349 except validation_schema.Invalid as errors:
358 350 errors = h.escape(errors.asdict())
359 351 h.flash(_('Error occurred during update of gist {}: {}').format(
360 352 gist_id, errors), category='error')
361 353 except Exception:
362 354 log.exception("Exception in gist edit")
363 355 h.flash(_('Error occurred during update of gist %s') % gist_id,
364 356 category='error')
365 357
366 358 raise HTTPFound(h.route_url('gist_show', gist_id=gist_id))
367 359
368 360 @LoginRequired()
369 361 @NotAnonymous()
370 362 def gist_edit_check_revision(self):
371 363 _ = self.request.translate
372 364 gist_id = self.request.matchdict['gist_id']
373 365 c = self.load_default_context()
374 366 c.gist = self._get_gist(gist_id)
375 367
376 368 last_rev = c.gist.scm_instance().get_commit()
377 369 success = True
378 370 revision = self.request.GET.get('revision')
379 371
380 372 if revision != last_rev.raw_id:
381 373 log.error('Last revision %s is different then submitted %s',
382 374 revision, last_rev)
383 375 # our gist has newer version than we
384 376 success = False
385 377
386 378 return {'success': success}
General Comments 0
You need to be logged in to leave comments. Login now