##// END OF EJS Templates
repo group: introduce editing of owner...
Mads Kiilerich -
r8734:abc29122 stable
parent child Browse files
Show More
@@ -1,402 +1,403 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.controllers.admin.repo_groups
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 Repository groups controller for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Mar 23, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import logging
29 29 import traceback
30 30
31 31 import formencode
32 32 from formencode import htmlfill
33 33 from tg import app_globals, request
34 34 from tg import tmpl_context as c
35 35 from tg.i18n import ugettext as _
36 36 from tg.i18n import ungettext
37 37 from webob.exc import HTTPForbidden, HTTPFound, HTTPInternalServerError, HTTPNotFound
38 38
39 39 from kallithea.controllers import base
40 40 from kallithea.lib import webutils
41 41 from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoGroupPermissionLevelDecorator, LoginRequired
42 42 from kallithea.lib.utils2 import safe_int
43 43 from kallithea.lib.webutils import url
44 44 from kallithea.model import db, meta
45 45 from kallithea.model.forms import RepoGroupForm, RepoGroupPermsForm
46 46 from kallithea.model.repo import RepoModel
47 47 from kallithea.model.repo_group import RepoGroupModel
48 48 from kallithea.model.scm import AvailableRepoGroupChoices, RepoGroupList
49 49
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53
54 54 class RepoGroupsController(base.BaseController):
55 55
56 56 @LoginRequired(allow_default_user=True)
57 57 def _before(self, *args, **kwargs):
58 58 super(RepoGroupsController, self)._before(*args, **kwargs)
59 59
60 60 def __load_defaults(self, extras=(), exclude=()):
61 61 """extras is used for keeping current parent ignoring permissions
62 62 exclude is used for not moving group to itself TODO: also exclude descendants
63 63 Note: only admin can create top level groups
64 64 """
65 65 repo_groups = AvailableRepoGroupChoices('admin', extras)
66 66 exclude_group_ids = set(rg.group_id for rg in exclude)
67 67 c.repo_groups = [rg for rg in repo_groups
68 68 if rg[0] not in exclude_group_ids]
69 69
70 70 def __load_data(self, group_id):
71 71 """
72 72 Load defaults settings for edit, and update
73 73
74 74 :param group_id:
75 75 """
76 76 repo_group = db.RepoGroup.get_or_404(group_id)
77 77 data = repo_group.get_dict()
78 78 data['group_name'] = repo_group.name
79 data['owner'] = repo_group.owner.username
79 80
80 81 # fill repository group users
81 82 for p in repo_group.repo_group_to_perm:
82 83 data.update({'u_perm_%s' % p.user.username:
83 84 p.permission.permission_name})
84 85
85 86 # fill repository group groups
86 87 for p in repo_group.users_group_to_perm:
87 88 data.update({'g_perm_%s' % p.users_group.users_group_name:
88 89 p.permission.permission_name})
89 90
90 91 return data
91 92
92 93 def _revoke_perms_on_yourself(self, form_result):
93 94 _up = [u for u in form_result['perms_updates'] if request.authuser.username == u[0]]
94 95 _new = [u for u in form_result['perms_new'] if request.authuser.username == u[0]]
95 96 if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin':
96 97 return True
97 98 return False
98 99
99 100 def index(self, format='html'):
100 101 _list = db.RepoGroup.query(sorted=True).all()
101 102 group_iter = RepoGroupList(_list, perm_level='admin')
102 103 repo_groups_data = []
103 104 _tmpl_lookup = app_globals.mako_lookup
104 105 template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
105 106
106 107 def repo_group_name(repo_group_name, children_groups):
107 108 return template.get_def("repo_group_name") \
108 109 .render_unicode(repo_group_name, children_groups, _=_, webutils=webutils, c=c)
109 110
110 111 def repo_group_actions(repo_group_id, repo_group_name, gr_count):
111 112 return template.get_def("repo_group_actions") \
112 113 .render_unicode(repo_group_id, repo_group_name, gr_count, _=_, webutils=webutils, c=c,
113 114 ungettext=ungettext)
114 115
115 116 for repo_gr in group_iter:
116 117 children_groups = [g.name for g in repo_gr.parents] + [repo_gr.name]
117 118 repo_count = repo_gr.repositories.count()
118 119 repo_groups_data.append({
119 120 "raw_name": webutils.escape(repo_gr.group_name),
120 121 "group_name": repo_group_name(repo_gr.group_name, children_groups),
121 122 "desc": webutils.escape(repo_gr.group_description),
122 123 "repos": repo_count,
123 124 "owner": repo_gr.owner.username,
124 125 "action": repo_group_actions(repo_gr.group_id, repo_gr.group_name,
125 126 repo_count)
126 127 })
127 128
128 129 c.data = {
129 130 "sort": None,
130 131 "dir": "asc",
131 132 "records": repo_groups_data
132 133 }
133 134
134 135 return base.render('admin/repo_groups/repo_groups.html')
135 136
136 137 def create(self):
137 138 self.__load_defaults()
138 139
139 140 # permissions for can create group based on parent_id are checked
140 141 # here in the Form
141 142 repo_group_form = RepoGroupForm(repo_groups=c.repo_groups)
142 143 form_result = None
143 144 try:
144 145 form_result = repo_group_form.to_python(dict(request.POST))
145 146 gr = RepoGroupModel().create(
146 147 group_name=form_result['group_name'],
147 148 group_description=form_result['group_description'],
148 149 parent=form_result['parent_group_id'],
149 owner=request.authuser.user_id, # TODO: make editable
150 owner=request.authuser.user_id,
150 151 copy_permissions=form_result['group_copy_permissions']
151 152 )
152 153 meta.Session().commit()
153 154 # TODO: in future action_logger(, '', '', '')
154 155 except formencode.Invalid as errors:
155 156 return htmlfill.render(
156 157 base.render('admin/repo_groups/repo_group_add.html'),
157 158 defaults=errors.value,
158 159 errors=errors.error_dict or {},
159 160 prefix_error=False,
160 161 encoding="UTF-8",
161 162 force_defaults=False)
162 163 except Exception:
163 164 log.error(traceback.format_exc())
164 165 webutils.flash(_('Error occurred during creation of repository group %s')
165 166 % request.POST.get('group_name'), category='error')
166 167 if form_result is None:
167 168 raise
168 169 parent_group_id = form_result['parent_group_id']
169 170 # TODO: maybe we should get back to the main view, not the admin one
170 171 raise HTTPFound(location=url('repos_groups', parent_group=parent_group_id))
171 172 webutils.flash(_('Created repository group %s') % gr.group_name,
172 173 category='success')
173 174 raise HTTPFound(location=url('repos_group_home', group_name=gr.group_name))
174 175
175 176 def new(self):
176 177 parent_group_id = safe_int(request.GET.get('parent_group') or '-1')
177 178 if HasPermissionAny('hg.admin')('group create'):
178 179 # we're global admin, we're ok and we can create TOP level groups
179 180 pass
180 181 else:
181 182 # we pass in parent group into creation form, thus we know
182 183 # what would be the group, we can check perms here !
183 184 group = db.RepoGroup.get(parent_group_id) if parent_group_id else None
184 185 group_name = group.group_name if group else None
185 186 if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'):
186 187 pass
187 188 else:
188 189 raise HTTPForbidden()
189 190
190 191 self.__load_defaults()
191 192 return htmlfill.render(
192 193 base.render('admin/repo_groups/repo_group_add.html'),
193 194 defaults={'parent_group_id': parent_group_id},
194 195 errors={},
195 196 prefix_error=False,
196 197 encoding="UTF-8",
197 198 force_defaults=False)
198 199
199 200 @HasRepoGroupPermissionLevelDecorator('admin')
200 201 def update(self, group_name):
201 202 c.repo_group = db.RepoGroup.guess_instance(group_name)
202 203 self.__load_defaults(extras=[c.repo_group.parent_group],
203 204 exclude=[c.repo_group])
204 205
205 206 # TODO: kill allow_empty_group - it is only used for redundant form validation!
206 207 if HasPermissionAny('hg.admin')('group edit'):
207 208 # we're global admin, we're ok and we can create TOP level groups
208 209 allow_empty_group = True
209 210 elif not c.repo_group.parent_group:
210 211 allow_empty_group = True
211 212 else:
212 213 allow_empty_group = False
213 214 repo_group_form = RepoGroupForm(
214 215 edit=True,
215 216 old_data=c.repo_group.get_dict(),
216 217 repo_groups=c.repo_groups,
217 218 can_create_in_root=allow_empty_group,
218 219 )()
219 220 try:
220 221 form_result = repo_group_form.to_python(dict(request.POST))
221 222
222 223 new_gr = RepoGroupModel().update(group_name, form_result)
223 224 meta.Session().commit()
224 225 webutils.flash(_('Updated repository group %s')
225 226 % form_result['group_name'], category='success')
226 227 # we now have new name !
227 228 group_name = new_gr.group_name
228 229 # TODO: in future action_logger(, '', '', '')
229 230 except formencode.Invalid as errors:
230 231 c.active = 'settings'
231 232 return htmlfill.render(
232 233 base.render('admin/repo_groups/repo_group_edit.html'),
233 234 defaults=errors.value,
234 235 errors=errors.error_dict or {},
235 236 prefix_error=False,
236 237 encoding="UTF-8",
237 238 force_defaults=False)
238 239 except Exception:
239 240 log.error(traceback.format_exc())
240 241 webutils.flash(_('Error occurred during update of repository group %s')
241 242 % request.POST.get('group_name'), category='error')
242 243
243 244 raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
244 245
245 246 @HasRepoGroupPermissionLevelDecorator('admin')
246 247 def delete(self, group_name):
247 248 gr = c.repo_group = db.RepoGroup.guess_instance(group_name)
248 249 parent_group = gr.parent_group
249 250 repos = gr.repositories.all()
250 251 if repos:
251 252 webutils.flash(_('This group contains %s repositories and cannot be '
252 253 'deleted') % len(repos), category='warning')
253 254 raise HTTPFound(location=url('repos_groups'))
254 255
255 256 children = gr.children.all()
256 257 if children:
257 258 webutils.flash(_('This group contains %s subgroups and cannot be deleted'
258 259 % (len(children))), category='warning')
259 260 raise HTTPFound(location=url('repos_groups'))
260 261
261 262 try:
262 263 RepoGroupModel().delete(group_name)
263 264 meta.Session().commit()
264 265 webutils.flash(_('Removed repository group %s') % group_name,
265 266 category='success')
266 267 # TODO: in future action_logger(, '', '', '')
267 268 except Exception:
268 269 log.error(traceback.format_exc())
269 270 webutils.flash(_('Error occurred during deletion of repository group %s')
270 271 % group_name, category='error')
271 272
272 273 if parent_group:
273 274 raise HTTPFound(location=url('repos_group_home', group_name=parent_group.group_name))
274 275 raise HTTPFound(location=url('repos_groups'))
275 276
276 277 def show_by_name(self, group_name):
277 278 """
278 279 This is a proxy that does a lookup group_name -> id, and shows
279 280 the group by id view instead
280 281 """
281 282 group_name = group_name.rstrip('/')
282 283 id_ = db.RepoGroup.get_by_group_name(group_name)
283 284 if id_:
284 285 return self.show(group_name)
285 286 raise HTTPNotFound
286 287
287 288 @HasRepoGroupPermissionLevelDecorator('read')
288 289 def show(self, group_name):
289 290 c.active = 'settings'
290 291
291 292 c.group = c.repo_group = db.RepoGroup.guess_instance(group_name)
292 293
293 294 groups = db.RepoGroup.query(sorted=True).filter_by(parent_group=c.group).all()
294 295 repo_groups_list = self.scm_model.get_repo_groups(groups)
295 296
296 297 repos_list = db.Repository.query(sorted=True).filter_by(group=c.group).all()
297 298 c.data = RepoModel().get_repos_as_dict(repos_list,
298 299 repo_groups_list=repo_groups_list,
299 300 short_name=True)
300 301
301 302 return base.render('admin/repo_groups/repo_group_show.html')
302 303
303 304 @HasRepoGroupPermissionLevelDecorator('admin')
304 305 def edit(self, group_name):
305 306 c.active = 'settings'
306 307
307 308 c.repo_group = db.RepoGroup.guess_instance(group_name)
308 309 self.__load_defaults(extras=[c.repo_group.parent_group],
309 310 exclude=[c.repo_group])
310 311 defaults = self.__load_data(c.repo_group.group_id)
311 312
312 313 return htmlfill.render(
313 314 base.render('admin/repo_groups/repo_group_edit.html'),
314 315 defaults=defaults,
315 316 encoding="UTF-8",
316 317 force_defaults=False
317 318 )
318 319
319 320 @HasRepoGroupPermissionLevelDecorator('admin')
320 321 def edit_repo_group_advanced(self, group_name):
321 322 c.active = 'advanced'
322 323 c.repo_group = db.RepoGroup.guess_instance(group_name)
323 324
324 325 return base.render('admin/repo_groups/repo_group_edit.html')
325 326
326 327 @HasRepoGroupPermissionLevelDecorator('admin')
327 328 def edit_repo_group_perms(self, group_name):
328 329 c.active = 'perms'
329 330 c.repo_group = db.RepoGroup.guess_instance(group_name)
330 331 self.__load_defaults()
331 332 defaults = self.__load_data(c.repo_group.group_id)
332 333
333 334 return htmlfill.render(
334 335 base.render('admin/repo_groups/repo_group_edit.html'),
335 336 defaults=defaults,
336 337 encoding="UTF-8",
337 338 force_defaults=False
338 339 )
339 340
340 341 @HasRepoGroupPermissionLevelDecorator('admin')
341 342 def update_perms(self, group_name):
342 343 """
343 344 Update permissions for given repository group
344 345
345 346 :param group_name:
346 347 """
347 348
348 349 c.repo_group = db.RepoGroup.guess_instance(group_name)
349 350 valid_recursive_choices = ['none', 'repos', 'groups', 'all']
350 351 form_result = RepoGroupPermsForm(valid_recursive_choices)().to_python(request.POST)
351 352 if not request.authuser.is_admin:
352 353 if self._revoke_perms_on_yourself(form_result):
353 354 msg = _('Cannot revoke permission for yourself as admin')
354 355 webutils.flash(msg, category='warning')
355 356 raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
356 357 recursive = form_result['recursive']
357 358 # iterate over all members(if in recursive mode) of this groups and
358 359 # set the permissions !
359 360 # this can be potentially heavy operation
360 361 RepoGroupModel()._update_permissions(c.repo_group,
361 362 form_result['perms_new'],
362 363 form_result['perms_updates'],
363 364 recursive)
364 365 # TODO: implement this
365 366 #action_logger(request.authuser, 'admin_changed_repo_permissions',
366 367 # repo_name, request.ip_addr)
367 368 meta.Session().commit()
368 369 webutils.flash(_('Repository group permissions updated'), category='success')
369 370 raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
370 371
371 372 @HasRepoGroupPermissionLevelDecorator('admin')
372 373 def delete_perms(self, group_name):
373 374 try:
374 375 obj_type = request.POST.get('obj_type')
375 376 obj_id = None
376 377 if obj_type == 'user':
377 378 obj_id = safe_int(request.POST.get('user_id'))
378 379 elif obj_type == 'user_group':
379 380 obj_id = safe_int(request.POST.get('user_group_id'))
380 381
381 382 if not request.authuser.is_admin:
382 383 if obj_type == 'user' and request.authuser.user_id == obj_id:
383 384 msg = _('Cannot revoke permission for yourself as admin')
384 385 webutils.flash(msg, category='warning')
385 386 raise Exception('revoke admin permission on self')
386 387 recursive = request.POST.get('recursive', 'none')
387 388 if obj_type == 'user':
388 389 RepoGroupModel().delete_permission(repo_group=group_name,
389 390 obj=obj_id, obj_type='user',
390 391 recursive=recursive)
391 392 elif obj_type == 'user_group':
392 393 RepoGroupModel().delete_permission(repo_group=group_name,
393 394 obj=obj_id,
394 395 obj_type='user_group',
395 396 recursive=recursive)
396 397
397 398 meta.Session().commit()
398 399 except Exception:
399 400 log.error(traceback.format_exc())
400 401 webutils.flash(_('An error occurred during revoking of permission'),
401 402 category='error')
402 403 raise HTTPInternalServerError()
@@ -1,564 +1,565 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 these are form validation classes
16 16 http://formencode.org/module-formencode.validators.html
17 17 for list of all available validators
18 18
19 19 we can create our own validators
20 20
21 21 The table below outlines the options which can be used in a schema in addition to the validators themselves
22 22 pre_validators [] These validators will be applied before the schema
23 23 chained_validators [] These validators will be applied after the schema
24 24 allow_extra_fields False If True, then it is not an error when keys that aren't associated with a validator are present
25 25 filter_extra_fields False If True, then keys that aren't associated with a validator are removed
26 26 if_key_missing NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value.
27 27 ignore_key_missing False If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already
28 28
29 29
30 30 <name> = formencode.validators.<name of validator>
31 31 <name> must equal form name
32 32 list=[1,2,3,4,5]
33 33 for SELECT use formencode.All(OneOf(list), Int())
34 34
35 35 """
36 36 import logging
37 37
38 38 import formencode
39 39 from formencode import All
40 40 from tg.i18n import ugettext as _
41 41
42 42 import kallithea
43 43 from kallithea.model import validators as v
44 44
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 def LoginForm():
50 50 class _LoginForm(formencode.Schema):
51 51 allow_extra_fields = True
52 52 filter_extra_fields = True
53 53 username = v.UnicodeString(
54 54 strip=True,
55 55 min=1,
56 56 not_empty=True,
57 57 messages={
58 58 'empty': _('Please enter a login'),
59 59 'tooShort': _('Enter a value %(min)i characters long or more')}
60 60 )
61 61
62 62 password = v.UnicodeString(
63 63 strip=False,
64 64 min=3,
65 65 not_empty=True,
66 66 messages={
67 67 'empty': _('Please enter a password'),
68 68 'tooShort': _('Enter %(min)i characters or more')}
69 69 )
70 70
71 71 remember = v.StringBoolean(if_missing=False)
72 72
73 73 chained_validators = [v.ValidAuth()]
74 74 return _LoginForm
75 75
76 76
77 77 def PasswordChangeForm(username):
78 78 class _PasswordChangeForm(formencode.Schema):
79 79 allow_extra_fields = True
80 80 filter_extra_fields = True
81 81
82 82 current_password = v.ValidOldPassword(username)(not_empty=True)
83 83 new_password = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
84 84 new_password_confirmation = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
85 85
86 86 chained_validators = [v.ValidPasswordsMatch('new_password',
87 87 'new_password_confirmation')]
88 88 return _PasswordChangeForm
89 89
90 90
91 91 def UserForm(edit=False, old_data=None):
92 92 old_data = old_data or {}
93 93
94 94 class _UserForm(formencode.Schema):
95 95 allow_extra_fields = True
96 96 filter_extra_fields = True
97 97 username = All(v.UnicodeString(strip=True, min=1, not_empty=True),
98 98 v.ValidUsername(edit, old_data))
99 99 if edit:
100 100 new_password = All(
101 101 v.ValidPassword(),
102 102 v.UnicodeString(strip=False, min=6, not_empty=False)
103 103 )
104 104 password_confirmation = All(
105 105 v.ValidPassword(),
106 106 v.UnicodeString(strip=False, min=6, not_empty=False),
107 107 )
108 108 admin = v.StringBoolean(if_missing=False)
109 109 chained_validators = [v.ValidPasswordsMatch('new_password',
110 110 'password_confirmation')]
111 111 else:
112 112 password = All(
113 113 v.ValidPassword(),
114 114 v.UnicodeString(strip=False, min=6, not_empty=True)
115 115 )
116 116 password_confirmation = All(
117 117 v.ValidPassword(),
118 118 v.UnicodeString(strip=False, min=6, not_empty=False)
119 119 )
120 120 chained_validators = [v.ValidPasswordsMatch('password',
121 121 'password_confirmation')]
122 122
123 123 active = v.StringBoolean(if_missing=False)
124 124 firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
125 125 lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
126 126 email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
127 127 extern_name = v.UnicodeString(strip=True, if_missing=None)
128 128 extern_type = v.UnicodeString(strip=True, if_missing=None)
129 129 return _UserForm
130 130
131 131
132 132 def UserGroupForm(edit=False, old_data=None, available_members=None):
133 133 old_data = old_data or {}
134 134 available_members = available_members or []
135 135
136 136 class _UserGroupForm(formencode.Schema):
137 137 allow_extra_fields = True
138 138 filter_extra_fields = True
139 139
140 140 users_group_name = All(
141 141 v.UnicodeString(strip=True, min=1, not_empty=True),
142 142 v.ValidUserGroup(edit, old_data)
143 143 )
144 144 user_group_description = v.UnicodeString(strip=True, min=1,
145 145 not_empty=False)
146 146
147 147 users_group_active = v.StringBoolean(if_missing=False)
148 148
149 149 if edit:
150 150 users_group_members = v.OneOf(
151 151 available_members, hideList=False, testValueList=True,
152 152 if_missing=None, not_empty=False
153 153 )
154 154
155 155 return _UserGroupForm
156 156
157 157
158 158 def RepoGroupForm(edit=False, old_data=None, repo_groups=None,
159 159 can_create_in_root=False):
160 160 old_data = old_data or {}
161 161 repo_groups = repo_groups or []
162 162 repo_group_ids = [rg[0] for rg in repo_groups]
163 163
164 164 class _RepoGroupForm(formencode.Schema):
165 165 allow_extra_fields = True
166 166 filter_extra_fields = False
167 167
168 168 group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
169 169 v.SlugifyName(),
170 170 v.ValidRegex(msg=_('Name must not contain only digits'))(r'(?!^\d+$)^.+$'))
171 171 group_description = v.UnicodeString(strip=True, min=1,
172 172 not_empty=False)
173 173 group_copy_permissions = v.StringBoolean(if_missing=False)
174 174
175 175 if edit:
176 owner = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
176 177 # FIXME: do a special check that we cannot move a group to one of
177 178 # its children
178 179 pass
179 180
180 181 parent_group_id = All(v.CanCreateGroup(can_create_in_root),
181 182 v.OneOf(repo_group_ids, hideList=False,
182 183 testValueList=True,
183 184 if_missing=None, not_empty=True),
184 185 v.Int(min=-1, not_empty=True))
185 186 chained_validators = [v.ValidRepoGroup(edit, old_data)]
186 187
187 188 return _RepoGroupForm
188 189
189 190
190 191 def RegisterForm(edit=False, old_data=None):
191 192 class _RegisterForm(formencode.Schema):
192 193 allow_extra_fields = True
193 194 filter_extra_fields = True
194 195 username = All(
195 196 v.ValidUsername(edit, old_data),
196 197 v.UnicodeString(strip=True, min=1, not_empty=True)
197 198 )
198 199 password = All(
199 200 v.ValidPassword(),
200 201 v.UnicodeString(strip=False, min=6, not_empty=True)
201 202 )
202 203 password_confirmation = All(
203 204 v.ValidPassword(),
204 205 v.UnicodeString(strip=False, min=6, not_empty=True)
205 206 )
206 207 active = v.StringBoolean(if_missing=False)
207 208 firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
208 209 lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
209 210 email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
210 211
211 212 chained_validators = [v.ValidPasswordsMatch('password',
212 213 'password_confirmation')]
213 214
214 215 return _RegisterForm
215 216
216 217
217 218 def PasswordResetRequestForm():
218 219 class _PasswordResetRequestForm(formencode.Schema):
219 220 allow_extra_fields = True
220 221 filter_extra_fields = True
221 222 email = v.Email(not_empty=True)
222 223 return _PasswordResetRequestForm
223 224
224 225
225 226 def PasswordResetConfirmationForm():
226 227 class _PasswordResetConfirmationForm(formencode.Schema):
227 228 allow_extra_fields = True
228 229 filter_extra_fields = True
229 230
230 231 email = v.UnicodeString(strip=True, not_empty=True)
231 232 timestamp = v.Number(strip=True, not_empty=True)
232 233 token = v.UnicodeString(strip=True, not_empty=True)
233 234 password = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
234 235 password_confirm = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
235 236
236 237 chained_validators = [v.ValidPasswordsMatch('password',
237 238 'password_confirm')]
238 239 return _PasswordResetConfirmationForm
239 240
240 241
241 242 def RepoForm(edit=False, old_data=None, supported_backends=kallithea.BACKENDS,
242 243 repo_groups=None, landing_revs=None):
243 244 old_data = old_data or {}
244 245 repo_groups = repo_groups or []
245 246 landing_revs = landing_revs or []
246 247 repo_group_ids = [rg[0] for rg in repo_groups]
247 248
248 249 class _RepoForm(formencode.Schema):
249 250 allow_extra_fields = True
250 251 filter_extra_fields = False
251 252 repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
252 253 v.SlugifyName())
253 254 repo_group = All(v.CanWriteGroup(old_data),
254 255 v.OneOf(repo_group_ids, hideList=True),
255 256 v.Int(min=-1, not_empty=True))
256 257 repo_type = v.OneOf(supported_backends, required=False,
257 258 if_missing=old_data.get('repo_type'))
258 259 repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
259 260 repo_private = v.StringBoolean(if_missing=False)
260 261 repo_landing_rev = v.OneOf(landing_revs, hideList=True)
261 262 repo_copy_permissions = v.StringBoolean(if_missing=False)
262 263 clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
263 264
264 265 repo_enable_statistics = v.StringBoolean(if_missing=False)
265 266 repo_enable_downloads = v.StringBoolean(if_missing=False)
266 267
267 268 if edit:
268 269 owner = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
269 270 # Not a real field - just for reference for validation:
270 271 # clone_uri_hidden = v.UnicodeString(if_missing='')
271 272
272 273 chained_validators = [v.ValidCloneUri(),
273 274 v.ValidRepoName(edit, old_data)]
274 275 return _RepoForm
275 276
276 277
277 278 def RepoPermsForm():
278 279 class _RepoPermsForm(formencode.Schema):
279 280 allow_extra_fields = True
280 281 filter_extra_fields = False
281 282 chained_validators = [v.ValidPerms(type_='repo')]
282 283 return _RepoPermsForm
283 284
284 285
285 286 def RepoGroupPermsForm(valid_recursive_choices):
286 287 class _RepoGroupPermsForm(formencode.Schema):
287 288 allow_extra_fields = True
288 289 filter_extra_fields = False
289 290 recursive = v.OneOf(valid_recursive_choices)
290 291 chained_validators = [v.ValidPerms(type_='repo_group')]
291 292 return _RepoGroupPermsForm
292 293
293 294
294 295 def UserGroupPermsForm():
295 296 class _UserPermsForm(formencode.Schema):
296 297 allow_extra_fields = True
297 298 filter_extra_fields = False
298 299 chained_validators = [v.ValidPerms(type_='user_group')]
299 300 return _UserPermsForm
300 301
301 302
302 303 def RepoFieldForm():
303 304 class _RepoFieldForm(formencode.Schema):
304 305 filter_extra_fields = True
305 306 allow_extra_fields = True
306 307
307 308 new_field_key = All(v.FieldKey(),
308 309 v.UnicodeString(strip=True, min=3, not_empty=True))
309 310 new_field_value = v.UnicodeString(not_empty=False, if_missing='')
310 311 new_field_type = v.OneOf(['str', 'unicode', 'list', 'tuple'],
311 312 if_missing='str')
312 313 new_field_label = v.UnicodeString(not_empty=False)
313 314 new_field_desc = v.UnicodeString(not_empty=False)
314 315
315 316 return _RepoFieldForm
316 317
317 318
318 319 def RepoForkForm(edit=False, old_data=None, supported_backends=kallithea.BACKENDS,
319 320 repo_groups=None, landing_revs=None):
320 321 old_data = old_data or {}
321 322 repo_groups = repo_groups or []
322 323 landing_revs = landing_revs or []
323 324 repo_group_ids = [rg[0] for rg in repo_groups]
324 325
325 326 class _RepoForkForm(formencode.Schema):
326 327 allow_extra_fields = True
327 328 filter_extra_fields = False
328 329 repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
329 330 v.SlugifyName())
330 331 repo_group = All(v.CanWriteGroup(),
331 332 v.OneOf(repo_group_ids, hideList=True),
332 333 v.Int(min=-1, not_empty=True))
333 334 repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
334 335 description = v.UnicodeString(strip=True, min=1, not_empty=True)
335 336 private = v.StringBoolean(if_missing=False)
336 337 copy_permissions = v.StringBoolean(if_missing=False)
337 338 update_after_clone = v.StringBoolean(if_missing=False)
338 339 fork_parent_id = v.UnicodeString()
339 340 chained_validators = [v.ValidForkName(edit, old_data)]
340 341 landing_rev = v.OneOf(landing_revs, hideList=True)
341 342
342 343 return _RepoForkForm
343 344
344 345
345 346 def ApplicationSettingsForm():
346 347 class _ApplicationSettingsForm(formencode.Schema):
347 348 allow_extra_fields = True
348 349 filter_extra_fields = False
349 350 title = v.UnicodeString(strip=True, not_empty=False)
350 351 realm = v.UnicodeString(strip=True, min=1, not_empty=True)
351 352 ga_code = v.UnicodeString(strip=True, min=1, not_empty=False)
352 353 captcha_public_key = v.UnicodeString(strip=True, min=1, not_empty=False)
353 354 captcha_private_key = v.UnicodeString(strip=True, min=1, not_empty=False)
354 355
355 356 return _ApplicationSettingsForm
356 357
357 358
358 359 def ApplicationVisualisationForm():
359 360 class _ApplicationVisualisationForm(formencode.Schema):
360 361 allow_extra_fields = True
361 362 filter_extra_fields = False
362 363 show_public_icon = v.StringBoolean(if_missing=False)
363 364 show_private_icon = v.StringBoolean(if_missing=False)
364 365 stylify_metalabels = v.StringBoolean(if_missing=False)
365 366
366 367 repository_fields = v.StringBoolean(if_missing=False)
367 368 lightweight_journal = v.StringBoolean(if_missing=False)
368 369 dashboard_items = v.Int(min=5, not_empty=True)
369 370 admin_grid_items = v.Int(min=5, not_empty=True)
370 371 show_version = v.StringBoolean(if_missing=False)
371 372 use_gravatar = v.StringBoolean(if_missing=False)
372 373 gravatar_url = v.UnicodeString(min=3)
373 374 clone_uri_tmpl = v.UnicodeString(min=3)
374 375 clone_ssh_tmpl = v.UnicodeString()
375 376
376 377 return _ApplicationVisualisationForm
377 378
378 379
379 380 def ApplicationUiSettingsForm():
380 381 class _ApplicationUiSettingsForm(formencode.Schema):
381 382 allow_extra_fields = True
382 383 filter_extra_fields = False
383 384 paths_root_path = All(
384 385 v.ValidPath(),
385 386 v.UnicodeString(strip=True, min=1, not_empty=True)
386 387 )
387 388 hooks_changegroup_kallithea_update = v.StringBoolean(if_missing=False)
388 389 hooks_changegroup_kallithea_repo_size = v.StringBoolean(if_missing=False)
389 390
390 391 extensions_largefiles = v.StringBoolean(if_missing=False)
391 392 extensions_hggit = v.StringBoolean(if_missing=False)
392 393
393 394 return _ApplicationUiSettingsForm
394 395
395 396
396 397 def DefaultPermissionsForm(repo_perms_choices, group_perms_choices,
397 398 user_group_perms_choices, create_choices,
398 399 user_group_create_choices, fork_choices,
399 400 register_choices, extern_activate_choices):
400 401 class _DefaultPermissionsForm(formencode.Schema):
401 402 allow_extra_fields = True
402 403 filter_extra_fields = True
403 404 overwrite_default_repo = v.StringBoolean(if_missing=False)
404 405 overwrite_default_group = v.StringBoolean(if_missing=False)
405 406 overwrite_default_user_group = v.StringBoolean(if_missing=False)
406 407 anonymous = v.StringBoolean(if_missing=False)
407 408 default_repo_perm = v.OneOf(repo_perms_choices)
408 409 default_group_perm = v.OneOf(group_perms_choices)
409 410 default_user_group_perm = v.OneOf(user_group_perms_choices)
410 411
411 412 default_repo_create = v.OneOf(create_choices)
412 413 default_user_group_create = v.OneOf(user_group_create_choices)
413 414 default_fork = v.OneOf(fork_choices)
414 415
415 416 default_register = v.OneOf(register_choices)
416 417 default_extern_activate = v.OneOf(extern_activate_choices)
417 418 return _DefaultPermissionsForm
418 419
419 420
420 421 def CustomDefaultPermissionsForm():
421 422 class _CustomDefaultPermissionsForm(formencode.Schema):
422 423 filter_extra_fields = True
423 424 allow_extra_fields = True
424 425
425 426 create_repo_perm = v.StringBoolean(if_missing=False)
426 427 create_user_group_perm = v.StringBoolean(if_missing=False)
427 428 #create_repo_group_perm Impl. later
428 429
429 430 fork_repo_perm = v.StringBoolean(if_missing=False)
430 431
431 432 return _CustomDefaultPermissionsForm
432 433
433 434
434 435 def DefaultsForm(edit=False, old_data=None, supported_backends=kallithea.BACKENDS):
435 436 class _DefaultsForm(formencode.Schema):
436 437 allow_extra_fields = True
437 438 filter_extra_fields = True
438 439 default_repo_type = v.OneOf(supported_backends)
439 440 default_repo_private = v.StringBoolean(if_missing=False)
440 441 default_repo_enable_statistics = v.StringBoolean(if_missing=False)
441 442 default_repo_enable_downloads = v.StringBoolean(if_missing=False)
442 443
443 444 return _DefaultsForm
444 445
445 446
446 447 def AuthSettingsForm(current_active_modules):
447 448 class _AuthSettingsForm(formencode.Schema):
448 449 allow_extra_fields = True
449 450 filter_extra_fields = True
450 451 auth_plugins = All(v.ValidAuthPlugins(),
451 452 v.UniqueListFromString()(not_empty=True))
452 453
453 454 def __init__(self, *args, **kwargs):
454 455 # The auth plugins tell us what form validators they use
455 456 if current_active_modules:
456 457 import kallithea.lib.auth_modules
457 458 from kallithea.lib.auth_modules import LazyFormencode
458 459 for module in current_active_modules:
459 460 plugin = kallithea.lib.auth_modules.loadplugin(module)
460 461 plugin_name = plugin.name
461 462 for sv in plugin.plugin_settings():
462 463 newk = "auth_%s_%s" % (plugin_name, sv["name"])
463 464 # can be a LazyFormencode object from plugin settings
464 465 validator = sv["validator"]
465 466 if isinstance(validator, LazyFormencode):
466 467 validator = validator()
467 468 # init all lazy validators from formencode.All
468 469 if isinstance(validator, All):
469 470 init_validators = []
470 471 for validator in validator.validators:
471 472 if isinstance(validator, LazyFormencode):
472 473 validator = validator()
473 474 init_validators.append(validator)
474 475 validator.validators = init_validators
475 476
476 477 self.add_field(newk, validator)
477 478 formencode.Schema.__init__(self, *args, **kwargs)
478 479
479 480 return _AuthSettingsForm
480 481
481 482
482 483 def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
483 484 tls_kind_choices):
484 485 class _LdapSettingsForm(formencode.Schema):
485 486 allow_extra_fields = True
486 487 filter_extra_fields = True
487 488 #pre_validators = [LdapLibValidator]
488 489 ldap_active = v.StringBoolean(if_missing=False)
489 490 ldap_host = v.UnicodeString(strip=True,)
490 491 ldap_port = v.Number(strip=True,)
491 492 ldap_tls_kind = v.OneOf(tls_kind_choices)
492 493 ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
493 494 ldap_dn_user = v.UnicodeString(strip=True,)
494 495 ldap_dn_pass = v.UnicodeString(strip=True,)
495 496 ldap_base_dn = v.UnicodeString(strip=True,)
496 497 ldap_filter = v.UnicodeString(strip=True,)
497 498 ldap_search_scope = v.OneOf(search_scope_choices)
498 499 ldap_attr_login = v.AttrLoginValidator()(not_empty=True)
499 500 ldap_attr_firstname = v.UnicodeString(strip=True,)
500 501 ldap_attr_lastname = v.UnicodeString(strip=True,)
501 502 ldap_attr_email = v.UnicodeString(strip=True,)
502 503
503 504 return _LdapSettingsForm
504 505
505 506
506 507 def UserExtraEmailForm():
507 508 class _UserExtraEmailForm(formencode.Schema):
508 509 email = All(v.UniqSystemEmail(), v.Email(not_empty=True))
509 510 return _UserExtraEmailForm
510 511
511 512
512 513 def UserExtraIpForm():
513 514 class _UserExtraIpForm(formencode.Schema):
514 515 ip = v.ValidIp()(not_empty=True)
515 516 return _UserExtraIpForm
516 517
517 518
518 519 def PullRequestForm(repo_id):
519 520 class _PullRequestForm(formencode.Schema):
520 521 allow_extra_fields = True
521 522 filter_extra_fields = True
522 523
523 524 org_repo = v.UnicodeString(strip=True, required=True)
524 525 org_ref = v.UnicodeString(strip=True, required=True)
525 526 other_repo = v.UnicodeString(strip=True, required=True)
526 527 other_ref = v.UnicodeString(strip=True, required=True)
527 528
528 529 pullrequest_title = v.UnicodeString(strip=True, required=True)
529 530 pullrequest_desc = v.UnicodeString(strip=True, required=False)
530 531
531 532 return _PullRequestForm
532 533
533 534
534 535 def PullRequestPostForm():
535 536 class _PullRequestPostForm(formencode.Schema):
536 537 allow_extra_fields = True
537 538 filter_extra_fields = True
538 539
539 540 pullrequest_title = v.UnicodeString(strip=True, required=True)
540 541 pullrequest_desc = v.UnicodeString(strip=True, required=False)
541 542 org_review_members = v.Set()
542 543 review_members = v.Set()
543 544 updaterev = v.UnicodeString(strip=True, required=False, if_missing=None)
544 545 owner = All(v.UnicodeString(strip=True, required=True),
545 546 v.ValidRepoUser())
546 547
547 548 return _PullRequestPostForm
548 549
549 550
550 551 def GistForm(lifetime_options):
551 552 class _GistForm(formencode.Schema):
552 553 allow_extra_fields = True
553 554 filter_extra_fields = True
554 555
555 556 filename = All(v.BasePath()(),
556 557 v.UnicodeString(strip=True, required=False))
557 558 description = v.UnicodeString(required=False, if_missing='')
558 559 lifetime = v.OneOf(lifetime_options)
559 560 mimetype = v.UnicodeString(required=False, if_missing=None)
560 561 content = v.UnicodeString(required=True, not_empty=True)
561 562 public = v.UnicodeString(required=False, if_missing='')
562 563 private = v.UnicodeString(required=False, if_missing='')
563 564
564 565 return _GistForm
@@ -1,532 +1,534 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.model.repo_group
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 repo group model for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Jan 25, 2011
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28
29 29 import datetime
30 30 import logging
31 31 import os
32 32 import shutil
33 33 import traceback
34 34
35 35 import kallithea.lib.utils2
36 36 from kallithea.lib.utils2 import LazyProperty
37 37 from kallithea.model import db, meta, repo
38 38
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class RepoGroupModel(object):
44 44
45 45 @LazyProperty
46 46 def repos_path(self):
47 47 """
48 48 Gets the repositories root path from database
49 49 """
50 50
51 51 q = db.Ui.get_by_key('paths', '/')
52 52 return q.ui_value
53 53
54 54 def _create_default_perms(self, new_group):
55 55 # create default permission
56 56 default_perm = 'group.read'
57 57 def_user = db.User.get_default_user()
58 58 for p in def_user.user_perms:
59 59 if p.permission.permission_name.startswith('group.'):
60 60 default_perm = p.permission.permission_name
61 61 break
62 62
63 63 repo_group_to_perm = db.UserRepoGroupToPerm()
64 64 repo_group_to_perm.permission = db.Permission.get_by_key(default_perm)
65 65
66 66 repo_group_to_perm.group = new_group
67 67 repo_group_to_perm.user_id = def_user.user_id
68 68 meta.Session().add(repo_group_to_perm)
69 69 return repo_group_to_perm
70 70
71 71 def _create_group(self, group_name):
72 72 """
73 73 makes repository group on filesystem
74 74
75 75 :param repo_name:
76 76 :param parent_id:
77 77 """
78 78
79 79 create_path = os.path.join(self.repos_path, group_name)
80 80 log.debug('creating new group in %s', create_path)
81 81
82 82 if os.path.isdir(create_path):
83 83 raise Exception('That directory already exists !')
84 84
85 85 os.makedirs(create_path)
86 86 log.debug('Created group in %s', create_path)
87 87
88 88 def _rename_group(self, old, new):
89 89 """
90 90 Renames a group on filesystem
91 91
92 92 :param group_name:
93 93 """
94 94
95 95 if old == new:
96 96 log.debug('skipping group rename')
97 97 return
98 98
99 99 log.debug('renaming repository group from %s to %s', old, new)
100 100
101 101 old_path = os.path.join(self.repos_path, old)
102 102 new_path = os.path.join(self.repos_path, new)
103 103
104 104 log.debug('renaming repos paths from %s to %s', old_path, new_path)
105 105
106 106 if os.path.isdir(new_path):
107 107 raise Exception('Was trying to rename to already '
108 108 'existing dir %s' % new_path)
109 109 shutil.move(old_path, new_path)
110 110
111 111 def _delete_group(self, group, force_delete=False):
112 112 """
113 113 Deletes a group from a filesystem
114 114
115 115 :param group: instance of group from database
116 116 :param force_delete: use shutil rmtree to remove all objects
117 117 """
118 118 paths = group.full_path.split(kallithea.URL_SEP)
119 119 paths = os.sep.join(paths)
120 120
121 121 rm_path = os.path.join(self.repos_path, paths)
122 122 log.info("Removing group %s", rm_path)
123 123 # delete only if that path really exists
124 124 if os.path.isdir(rm_path):
125 125 if force_delete:
126 126 shutil.rmtree(rm_path)
127 127 else:
128 128 # archive that group
129 129 _now = datetime.datetime.now()
130 130 _ms = str(_now.microsecond).rjust(6, '0')
131 131 _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
132 132 group.name)
133 133 shutil.move(rm_path, os.path.join(self.repos_path, _d))
134 134
135 135 def create(self, group_name, group_description, owner, parent=None,
136 136 just_db=False, copy_permissions=False):
137 137 try:
138 138 if kallithea.lib.utils2.repo_name_slug(group_name) != group_name:
139 139 raise Exception('invalid repo group name %s' % group_name)
140 140
141 141 owner = db.User.guess_instance(owner)
142 142 parent_group = db.RepoGroup.guess_instance(parent)
143 143 new_repo_group = db.RepoGroup()
144 144 new_repo_group.owner = owner
145 145 new_repo_group.group_description = group_description or group_name
146 146 new_repo_group.parent_group = parent_group
147 147 new_repo_group.group_name = new_repo_group.get_new_name(group_name)
148 148
149 149 meta.Session().add(new_repo_group)
150 150
151 151 # create an ADMIN permission for owner except if we're super admin,
152 152 # later owner should go into the owner field of groups
153 153 if not owner.is_admin:
154 154 self.grant_user_permission(repo_group=new_repo_group,
155 155 user=owner, perm='group.admin')
156 156
157 157 if parent_group and copy_permissions:
158 158 # copy permissions from parent
159 159 user_perms = db.UserRepoGroupToPerm.query() \
160 160 .filter(db.UserRepoGroupToPerm.group == parent_group).all()
161 161
162 162 group_perms = db.UserGroupRepoGroupToPerm.query() \
163 163 .filter(db.UserGroupRepoGroupToPerm.group == parent_group).all()
164 164
165 165 for perm in user_perms:
166 166 # don't copy over the permission for user who is creating
167 167 # this group, if he is not super admin he get's admin
168 168 # permission set above
169 169 if perm.user != owner or owner.is_admin:
170 170 db.UserRepoGroupToPerm.create(perm.user, new_repo_group, perm.permission)
171 171
172 172 for perm in group_perms:
173 173 db.UserGroupRepoGroupToPerm.create(perm.users_group, new_repo_group, perm.permission)
174 174 else:
175 175 self._create_default_perms(new_repo_group)
176 176
177 177 if not just_db:
178 178 # we need to flush here, in order to check if database won't
179 179 # throw any exceptions, create filesystem dirs at the very end
180 180 meta.Session().flush()
181 181 self._create_group(new_repo_group.group_name)
182 182
183 183 return new_repo_group
184 184 except Exception:
185 185 log.error(traceback.format_exc())
186 186 raise
187 187
188 188 def _update_permissions(self, repo_group, perms_new=None,
189 189 perms_updates=None, recursive=None,
190 190 check_perms=True):
191 191 from kallithea.lib.auth import HasUserGroupPermissionLevel
192 192
193 193 if not perms_new:
194 194 perms_new = []
195 195 if not perms_updates:
196 196 perms_updates = []
197 197
198 198 def _set_perm_user(obj, user, perm):
199 199 if isinstance(obj, db.RepoGroup):
200 200 self.grant_user_permission(repo_group=obj, user=user, perm=perm)
201 201 elif isinstance(obj, db.Repository):
202 202 user = db.User.guess_instance(user)
203 203
204 204 # private repos will not allow to change the default permissions
205 205 # using recursive mode
206 206 if obj.private and user.is_default_user:
207 207 return
208 208
209 209 # we set group permission but we have to switch to repo
210 210 # permission
211 211 perm = perm.replace('group.', 'repository.')
212 212 repo.RepoModel().grant_user_permission(
213 213 repo=obj, user=user, perm=perm
214 214 )
215 215
216 216 def _set_perm_group(obj, users_group, perm):
217 217 if isinstance(obj, db.RepoGroup):
218 218 self.grant_user_group_permission(repo_group=obj,
219 219 group_name=users_group,
220 220 perm=perm)
221 221 elif isinstance(obj, db.Repository):
222 222 # we set group permission but we have to switch to repo
223 223 # permission
224 224 perm = perm.replace('group.', 'repository.')
225 225 repo.RepoModel().grant_user_group_permission(
226 226 repo=obj, group_name=users_group, perm=perm
227 227 )
228 228
229 229 # start updates
230 230 updates = []
231 231 log.debug('Now updating permissions for %s in recursive mode:%s',
232 232 repo_group, recursive)
233 233
234 234 for obj in repo_group.recursive_groups_and_repos():
235 235 # iterated obj is an instance of a repos group or repository in
236 236 # that group, recursive option can be: none, repos, groups, all
237 237 if recursive == 'all':
238 238 pass
239 239 elif recursive == 'repos':
240 240 # skip groups, other than this one
241 241 if isinstance(obj, db.RepoGroup) and not obj == repo_group:
242 242 continue
243 243 elif recursive == 'groups':
244 244 # skip repos
245 245 if isinstance(obj, db.Repository):
246 246 continue
247 247 else: # recursive == 'none': # DEFAULT don't apply to iterated objects
248 248 obj = repo_group
249 249 # also we do a break at the end of this loop.
250 250
251 251 # update permissions
252 252 for member, perm, member_type in perms_updates:
253 253 ## set for user
254 254 if member_type == 'user':
255 255 # this updates also current one if found
256 256 _set_perm_user(obj, user=member, perm=perm)
257 257 ## set for user group
258 258 else:
259 259 # check if we have permissions to alter this usergroup's access
260 260 if not check_perms or HasUserGroupPermissionLevel('read')(member):
261 261 _set_perm_group(obj, users_group=member, perm=perm)
262 262 # set new permissions
263 263 for member, perm, member_type in perms_new:
264 264 if member_type == 'user':
265 265 _set_perm_user(obj, user=member, perm=perm)
266 266 else:
267 267 # check if we have permissions to alter this usergroup's access
268 268 if not check_perms or HasUserGroupPermissionLevel('read')(member):
269 269 _set_perm_group(obj, users_group=member, perm=perm)
270 270 updates.append(obj)
271 271 # if it's not recursive call for all,repos,groups
272 272 # break the loop and don't proceed with other changes
273 273 if recursive not in ['all', 'repos', 'groups']:
274 274 break
275 275
276 276 return updates
277 277
278 278 def update(self, repo_group, repo_group_args):
279 279 try:
280 280 repo_group = db.RepoGroup.guess_instance(repo_group)
281 281 old_path = repo_group.full_path
282 282
283 283 # change properties
284 if 'owner' in repo_group_args:
285 repo_group.owner = db.User.get_by_username(repo_group_args['owner'])
284 286 if 'group_description' in repo_group_args:
285 287 repo_group.group_description = repo_group_args['group_description']
286 288 if 'parent_group_id' in repo_group_args:
287 289 assert repo_group_args['parent_group_id'] != '-1', repo_group_args # RepoGroupForm should have converted to None
288 290 repo_group.parent_group = db.RepoGroup.get(repo_group_args['parent_group_id'])
289 291 repo_group.group_name = repo_group.get_new_name(repo_group.name)
290 292 if 'group_name' in repo_group_args:
291 293 group_name = repo_group_args['group_name']
292 294 if kallithea.lib.utils2.repo_name_slug(group_name) != group_name:
293 295 raise Exception('invalid repo group name %s' % group_name)
294 296 repo_group.group_name = repo_group.get_new_name(group_name)
295 297 new_path = repo_group.full_path
296 298 meta.Session().add(repo_group)
297 299
298 300 # iterate over all members of this groups and do fixes
299 301 # if obj is a repoGroup also fix the name of the group according
300 302 # to the parent
301 303 # if obj is a Repo fix it's name
302 304 # this can be potentially heavy operation
303 305 for obj in repo_group.recursive_groups_and_repos():
304 306 # set the value from it's parent
305 307 if isinstance(obj, db.RepoGroup):
306 308 new_name = obj.get_new_name(obj.name)
307 309 log.debug('Fixing group %s to new name %s'
308 310 % (obj.group_name, new_name))
309 311 obj.group_name = new_name
310 312 elif isinstance(obj, db.Repository):
311 313 # we need to get all repositories from this new group and
312 314 # rename them accordingly to new group path
313 315 new_name = obj.get_new_name(obj.just_name)
314 316 log.debug('Fixing repo %s to new name %s'
315 317 % (obj.repo_name, new_name))
316 318 obj.repo_name = new_name
317 319
318 320 self._rename_group(old_path, new_path)
319 321
320 322 return repo_group
321 323 except Exception:
322 324 log.error(traceback.format_exc())
323 325 raise
324 326
325 327 def delete(self, repo_group, force_delete=False):
326 328 repo_group = db.RepoGroup.guess_instance(repo_group)
327 329 try:
328 330 meta.Session().delete(repo_group)
329 331 self._delete_group(repo_group, force_delete)
330 332 except Exception:
331 333 log.error('Error removing repo_group %s', repo_group)
332 334 raise
333 335
334 336 def add_permission(self, repo_group, obj, obj_type, perm, recursive):
335 337 repo_group = db.RepoGroup.guess_instance(repo_group)
336 338 perm = db.Permission.guess_instance(perm)
337 339
338 340 for el in repo_group.recursive_groups_and_repos():
339 341 # iterated obj is an instance of a repos group or repository in
340 342 # that group, recursive option can be: none, repos, groups, all
341 343 if recursive == 'all':
342 344 pass
343 345 elif recursive == 'repos':
344 346 # skip groups, other than this one
345 347 if isinstance(el, db.RepoGroup) and not el == repo_group:
346 348 continue
347 349 elif recursive == 'groups':
348 350 # skip repos
349 351 if isinstance(el, db.Repository):
350 352 continue
351 353 else: # recursive == 'none': # DEFAULT don't apply to iterated objects
352 354 el = repo_group
353 355 # also we do a break at the end of this loop.
354 356
355 357 if isinstance(el, db.RepoGroup):
356 358 if obj_type == 'user':
357 359 RepoGroupModel().grant_user_permission(el, user=obj, perm=perm)
358 360 elif obj_type == 'user_group':
359 361 RepoGroupModel().grant_user_group_permission(el, group_name=obj, perm=perm)
360 362 else:
361 363 raise Exception('undefined object type %s' % obj_type)
362 364 elif isinstance(el, db.Repository):
363 365 # for repos we need to hotfix the name of permission
364 366 _perm = perm.permission_name.replace('group.', 'repository.')
365 367 if obj_type == 'user':
366 368 repo.RepoModel().grant_user_permission(el, user=obj, perm=_perm)
367 369 elif obj_type == 'user_group':
368 370 repo.RepoModel().grant_user_group_permission(el, group_name=obj, perm=_perm)
369 371 else:
370 372 raise Exception('undefined object type %s' % obj_type)
371 373 else:
372 374 raise Exception('el should be instance of Repository or '
373 375 'RepositoryGroup got %s instead' % type(el))
374 376
375 377 # if it's not recursive call for all,repos,groups
376 378 # break the loop and don't proceed with other changes
377 379 if recursive not in ['all', 'repos', 'groups']:
378 380 break
379 381
380 382 def delete_permission(self, repo_group, obj, obj_type, recursive):
381 383 """
382 384 Revokes permission for repo_group for given obj(user or users_group),
383 385 obj_type can be user or user group
384 386
385 387 :param repo_group:
386 388 :param obj: user or user group id
387 389 :param obj_type: user or user group type
388 390 :param recursive: recurse to all children of group
389 391 """
390 392 repo_group = db.RepoGroup.guess_instance(repo_group)
391 393
392 394 for el in repo_group.recursive_groups_and_repos():
393 395 # iterated obj is an instance of a repos group or repository in
394 396 # that group, recursive option can be: none, repos, groups, all
395 397 if recursive == 'all':
396 398 pass
397 399 elif recursive == 'repos':
398 400 # skip groups, other than this one
399 401 if isinstance(el, db.RepoGroup) and not el == repo_group:
400 402 continue
401 403 elif recursive == 'groups':
402 404 # skip repos
403 405 if isinstance(el, db.Repository):
404 406 continue
405 407 else: # recursive == 'none': # DEFAULT don't apply to iterated objects
406 408 el = repo_group
407 409 # also we do a break at the end of this loop.
408 410
409 411 if isinstance(el, db.RepoGroup):
410 412 if obj_type == 'user':
411 413 RepoGroupModel().revoke_user_permission(el, user=obj)
412 414 elif obj_type == 'user_group':
413 415 RepoGroupModel().revoke_user_group_permission(el, group_name=obj)
414 416 else:
415 417 raise Exception('undefined object type %s' % obj_type)
416 418 elif isinstance(el, db.Repository):
417 419 if obj_type == 'user':
418 420 repo.RepoModel().revoke_user_permission(el, user=obj)
419 421 elif obj_type == 'user_group':
420 422 repo.RepoModel().revoke_user_group_permission(el, group_name=obj)
421 423 else:
422 424 raise Exception('undefined object type %s' % obj_type)
423 425 else:
424 426 raise Exception('el should be instance of Repository or '
425 427 'RepositoryGroup got %s instead' % type(el))
426 428
427 429 # if it's not recursive call for all,repos,groups
428 430 # break the loop and don't proceed with other changes
429 431 if recursive not in ['all', 'repos', 'groups']:
430 432 break
431 433
432 434 def grant_user_permission(self, repo_group, user, perm):
433 435 """
434 436 Grant permission for user on given repository group, or update
435 437 existing one if found
436 438
437 439 :param repo_group: Instance of RepoGroup, repositories_group_id,
438 440 or repositories_group name
439 441 :param user: Instance of User, user_id or username
440 442 :param perm: Instance of Permission, or permission_name
441 443 """
442 444
443 445 repo_group = db.RepoGroup.guess_instance(repo_group)
444 446 user = db.User.guess_instance(user)
445 447 permission = db.Permission.guess_instance(perm)
446 448
447 449 # check if we have that permission already
448 450 obj = db.UserRepoGroupToPerm.query() \
449 451 .filter(db.UserRepoGroupToPerm.user == user) \
450 452 .filter(db.UserRepoGroupToPerm.group == repo_group) \
451 453 .scalar()
452 454 if obj is None:
453 455 # create new !
454 456 obj = db.UserRepoGroupToPerm()
455 457 meta.Session().add(obj)
456 458 obj.group = repo_group
457 459 obj.user = user
458 460 obj.permission = permission
459 461 log.debug('Granted perm %s to %s on %s', perm, user, repo_group)
460 462 return obj
461 463
462 464 def revoke_user_permission(self, repo_group, user):
463 465 """
464 466 Revoke permission for user on given repository group
465 467
466 468 :param repo_group: Instance of RepoGroup, repositories_group_id,
467 469 or repositories_group name
468 470 :param user: Instance of User, user_id or username
469 471 """
470 472
471 473 repo_group = db.RepoGroup.guess_instance(repo_group)
472 474 user = db.User.guess_instance(user)
473 475
474 476 obj = db.UserRepoGroupToPerm.query() \
475 477 .filter(db.UserRepoGroupToPerm.user == user) \
476 478 .filter(db.UserRepoGroupToPerm.group == repo_group) \
477 479 .scalar()
478 480 if obj is not None:
479 481 meta.Session().delete(obj)
480 482 log.debug('Revoked perm on %s on %s', repo_group, user)
481 483
482 484 def grant_user_group_permission(self, repo_group, group_name, perm):
483 485 """
484 486 Grant permission for user group on given repository group, or update
485 487 existing one if found
486 488
487 489 :param repo_group: Instance of RepoGroup, repositories_group_id,
488 490 or repositories_group name
489 491 :param group_name: Instance of UserGroup, users_group_id,
490 492 or user group name
491 493 :param perm: Instance of Permission, or permission_name
492 494 """
493 495 repo_group = db.RepoGroup.guess_instance(repo_group)
494 496 group_name = db.UserGroup.guess_instance(group_name)
495 497 permission = db.Permission.guess_instance(perm)
496 498
497 499 # check if we have that permission already
498 500 obj = db.UserGroupRepoGroupToPerm.query() \
499 501 .filter(db.UserGroupRepoGroupToPerm.group == repo_group) \
500 502 .filter(db.UserGroupRepoGroupToPerm.users_group == group_name) \
501 503 .scalar()
502 504
503 505 if obj is None:
504 506 # create new
505 507 obj = db.UserGroupRepoGroupToPerm()
506 508 meta.Session().add(obj)
507 509
508 510 obj.group = repo_group
509 511 obj.users_group = group_name
510 512 obj.permission = permission
511 513 log.debug('Granted perm %s to %s on %s', perm, group_name, repo_group)
512 514 return obj
513 515
514 516 def revoke_user_group_permission(self, repo_group, group_name):
515 517 """
516 518 Revoke permission for user group on given repository group
517 519
518 520 :param repo_group: Instance of RepoGroup, repositories_group_id,
519 521 or repositories_group name
520 522 :param group_name: Instance of UserGroup, users_group_id,
521 523 or user group name
522 524 """
523 525 repo_group = db.RepoGroup.guess_instance(repo_group)
524 526 group_name = db.UserGroup.guess_instance(group_name)
525 527
526 528 obj = db.UserGroupRepoGroupToPerm.query() \
527 529 .filter(db.UserGroupRepoGroupToPerm.group == repo_group) \
528 530 .filter(db.UserGroupRepoGroupToPerm.users_group == group_name) \
529 531 .scalar()
530 532 if obj is not None:
531 533 meta.Session().delete(obj)
532 534 log.debug('Revoked perm to %s on %s', repo_group, group_name)
@@ -1,51 +1,59 b''
1 1 ## -*- coding: utf-8 -*-
2 2 ${h.form(url('update_repos_group',group_name=c.repo_group.group_name))}
3 3 <div class="form">
4 4 <div class="form-group">
5 5 <label class="control-label" for="group_name">${_('Group name')}:</label>
6 6 <div>
7 7 ${h.text('group_name',class_='form-control')}
8 8 </div>
9 9 </div>
10 10
11 11 <div class="form-group">
12 <label class="control-label" for="owner">${_('Owner')}:</label>
13 <div>
14 ${h.text('owner',class_='form-control', placeholder=_('Type name of user'))}
15 </div>
16 </div>
17
18 <div class="form-group">
12 19 <label class="control-label" for="group_description">${_('Description')}:</label>
13 20 <div>
14 21 ${h.textarea('group_description',cols=23,rows=5,class_='form-control')}
15 22 </div>
16 23 </div>
17 24
18 25 <div class="form-group">
19 26 <label class="control-label" for="parent_group_id">${_('Group parent')}:</label>
20 27 <div>
21 28 ${h.select('parent_group_id','',c.repo_groups,class_='form-control')}
22 29 </div>
23 30 </div>
24 31
25 32 <div class="form-group">
26 33 <div class="buttons">
27 34 ${h.submit('save',_('Save'),class_="btn btn-default")}
28 35 ${h.reset('reset',_('Reset'),class_="btn btn-default")}
29 36 </div>
30 37 </div>
31 38 </div>
32 39 ${h.end_form()}
33 40
34 41 ${h.form(url('delete_repo_group', group_name=c.repo_group.group_name))}
35 42 <div class="form">
36 43 <div class="form-group">
37 44 <div class="buttons">
38 45 ${h.submit('remove_%s' % c.repo_group.group_name,_('Remove this group'),class_="btn btn-danger",onclick="return confirm('"+_('Confirm to delete this group')+"');")}
39 46 </div>
40 47 </div>
41 48 </div>
42 49 ${h.end_form()}
43 50
44 51 <script>
45 52 'use strict';
46 53 $(document).ready(function(){
47 54 $("#parent_group_id").select2({
48 55 'dropdownAutoWidth': true
49 56 });
57 SimpleUserAutoComplete($('#owner'));
50 58 });
51 59 </script>
@@ -1,2921 +1,2921 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14
15 15 """
16 16 Tests for the JSON-RPC web api.
17 17 """
18 18
19 19 import datetime
20 20 import os
21 21 import random
22 22 import re
23 23 import string
24 24 from typing import Sized
25 25
26 26 import mock
27 27 import pytest
28 28 from webtest import TestApp
29 29
30 30 from kallithea.lib import ext_json
31 31 from kallithea.lib.auth import AuthUser
32 32 from kallithea.lib.utils2 import ascii_bytes
33 33 from kallithea.model import db, meta
34 34 from kallithea.model.changeset_status import ChangesetStatusModel
35 35 from kallithea.model.gist import GistModel
36 36 from kallithea.model.pull_request import PullRequestModel
37 37 from kallithea.model.repo import RepoModel
38 38 from kallithea.model.repo_group import RepoGroupModel
39 39 from kallithea.model.scm import ScmModel
40 40 from kallithea.model.user import UserModel
41 41 from kallithea.model.user_group import UserGroupModel
42 42 from kallithea.tests import base
43 43 from kallithea.tests.fixture import Fixture, raise_exception
44 44
45 45
46 46 API_URL = '/_admin/api'
47 47 TEST_USER_GROUP = 'test_user_group'
48 48 TEST_REPO_GROUP = 'test_repo_group'
49 49
50 50 fixture = Fixture()
51 51
52 52
53 53 def _build_data(apikey, method, **kw):
54 54 """
55 55 Builds API data with given random ID
56 56 For convenience, the json is returned as str
57 57 """
58 58 random_id = random.randrange(1, 9999)
59 59 return random_id, ext_json.dumps({
60 60 "id": random_id,
61 61 "api_key": apikey,
62 62 "method": method,
63 63 "args": kw
64 64 })
65 65
66 66
67 67 jsonify = lambda obj: ext_json.loads(ext_json.dumps(obj))
68 68
69 69
70 70 def api_call(test_obj, params):
71 71 response = test_obj.app.post(API_URL, content_type='application/json',
72 72 params=params)
73 73 return response
74 74
75 75
76 76 ## helpers
77 77 def make_user_group(name=TEST_USER_GROUP):
78 78 gr = fixture.create_user_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
79 79 UserGroupModel().add_user_to_group(user_group=gr,
80 80 user=base.TEST_USER_ADMIN_LOGIN)
81 81 meta.Session().commit()
82 82 return gr
83 83
84 84
85 85 def make_repo_group(name=TEST_REPO_GROUP):
86 86 gr = fixture.create_repo_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
87 87 meta.Session().commit()
88 88 return gr
89 89
90 90
91 91 class _BaseTestApi(object):
92 92 app: TestApp # assigned by app_fixture in subclass TestController mixin
93 93 # assigned in subclass:
94 94 REPO: str
95 95 REPO_TYPE: str
96 96 TEST_REVISION: str
97 97 TEST_PR_SRC: str
98 98 TEST_PR_DST: str
99 99 TEST_PR_REVISIONS: Sized
100 100
101 101 @classmethod
102 102 def setup_class(cls):
103 103 cls.usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
104 104 cls.apikey = cls.usr.api_key
105 105 cls.test_user = UserModel().create_or_update(
106 106 username='test-api',
107 107 password='test',
108 108 email='test@example.com',
109 109 firstname='first',
110 110 lastname='last'
111 111 )
112 112 meta.Session().commit()
113 113 cls.TEST_USER_LOGIN = cls.test_user.username
114 114 cls.apikey_regular = cls.test_user.api_key
115 115
116 116 @classmethod
117 117 def teardown_class(cls):
118 118 pass
119 119
120 120 def setup_method(self, method):
121 121 make_user_group()
122 122 make_repo_group()
123 123
124 124 def teardown_method(self, method):
125 125 fixture.destroy_user_group(TEST_USER_GROUP)
126 126 fixture.destroy_gists()
127 127 fixture.destroy_repo_group(TEST_REPO_GROUP)
128 128
129 129 def _compare_ok(self, id_, expected, given):
130 130 expected = jsonify({
131 131 'id': id_,
132 132 'error': None,
133 133 'result': expected
134 134 })
135 135 given = ext_json.loads(given)
136 136 assert expected == given, (expected, given)
137 137
138 138 def _compare_error(self, id_, expected, given):
139 139 expected = jsonify({
140 140 'id': id_,
141 141 'error': expected,
142 142 'result': None
143 143 })
144 144 given = ext_json.loads(given)
145 145 assert expected == given, (expected, given)
146 146
147 147 def test_api_wrong_key(self):
148 148 id_, params = _build_data('trololo', 'get_user')
149 149 response = api_call(self, params)
150 150
151 151 expected = 'Invalid API key'
152 152 self._compare_error(id_, expected, given=response.body)
153 153
154 154 def test_api_missing_non_optional_param(self):
155 155 id_, params = _build_data(self.apikey, 'get_repo')
156 156 response = api_call(self, params)
157 157
158 158 expected = 'Missing non optional `repoid` arg in JSON DATA'
159 159 self._compare_error(id_, expected, given=response.body)
160 160
161 161 def test_api_missing_non_optional_param_args_null(self):
162 162 id_, params = _build_data(self.apikey, 'get_repo')
163 163 params = params.replace('"args": {}', '"args": null')
164 164 response = api_call(self, params)
165 165
166 166 expected = 'Missing non optional `repoid` arg in JSON DATA'
167 167 self._compare_error(id_, expected, given=response.body)
168 168
169 169 def test_api_missing_non_optional_param_args_bad(self):
170 170 id_, params = _build_data(self.apikey, 'get_repo')
171 171 params = params.replace('"args": {}', '"args": 1')
172 172 response = api_call(self, params)
173 173
174 174 expected = 'Missing non optional `repoid` arg in JSON DATA'
175 175 self._compare_error(id_, expected, given=response.body)
176 176
177 177 def test_api_args_is_null(self):
178 178 id_, params = _build_data(self.apikey, 'get_users', )
179 179 params = params.replace('"args": {}', '"args": null')
180 180 response = api_call(self, params)
181 181 assert response.status == '200 OK'
182 182
183 183 def test_api_args_is_bad(self):
184 184 id_, params = _build_data(self.apikey, 'get_users', )
185 185 params = params.replace('"args": {}', '"args": 1')
186 186 response = api_call(self, params)
187 187 assert response.status == '200 OK'
188 188
189 189 def test_api_args_different_args(self):
190 190 expected = {
191 191 'ascii_letters': string.ascii_letters,
192 192 'ws': string.whitespace,
193 193 'printables': string.printable
194 194 }
195 195 id_, params = _build_data(self.apikey, 'test', args=expected)
196 196 response = api_call(self, params)
197 197 assert response.status == '200 OK'
198 198 self._compare_ok(id_, expected, response.body)
199 199
200 200 def test_api_get_users(self):
201 201 id_, params = _build_data(self.apikey, 'get_users', )
202 202 response = api_call(self, params)
203 203 ret_all = []
204 204 _users = db.User.query().filter_by(is_default_user=False) \
205 205 .order_by(db.User.username).all()
206 206 for usr in _users:
207 207 ret = usr.get_api_data()
208 208 ret_all.append(jsonify(ret))
209 209 expected = ret_all
210 210 self._compare_ok(id_, expected, given=response.body)
211 211
212 212 def test_api_get_user(self):
213 213 id_, params = _build_data(self.apikey, 'get_user',
214 214 userid=base.TEST_USER_ADMIN_LOGIN)
215 215 response = api_call(self, params)
216 216
217 217 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
218 218 ret = usr.get_api_data()
219 219 ret['permissions'] = AuthUser(dbuser=usr).permissions
220 220
221 221 expected = ret
222 222 self._compare_ok(id_, expected, given=response.body)
223 223
224 224 def test_api_get_user_that_does_not_exist(self):
225 225 id_, params = _build_data(self.apikey, 'get_user',
226 226 userid='trololo')
227 227 response = api_call(self, params)
228 228
229 229 expected = "user `%s` does not exist" % 'trololo'
230 230 self._compare_error(id_, expected, given=response.body)
231 231
232 232 def test_api_get_user_without_giving_userid(self):
233 233 id_, params = _build_data(self.apikey, 'get_user')
234 234 response = api_call(self, params)
235 235
236 236 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
237 237 ret = usr.get_api_data()
238 238 ret['permissions'] = AuthUser(dbuser=usr).permissions
239 239
240 240 expected = ret
241 241 self._compare_ok(id_, expected, given=response.body)
242 242
243 243 def test_api_get_user_without_giving_userid_non_admin(self):
244 244 id_, params = _build_data(self.apikey_regular, 'get_user')
245 245 response = api_call(self, params)
246 246
247 247 usr = db.User.get_by_username(self.TEST_USER_LOGIN)
248 248 ret = usr.get_api_data()
249 249 ret['permissions'] = AuthUser(dbuser=usr).permissions
250 250
251 251 expected = ret
252 252 self._compare_ok(id_, expected, given=response.body)
253 253
254 254 def test_api_get_user_with_giving_userid_non_admin(self):
255 255 id_, params = _build_data(self.apikey_regular, 'get_user',
256 256 userid=self.TEST_USER_LOGIN)
257 257 response = api_call(self, params)
258 258
259 259 expected = 'userid is not the same as your user'
260 260 self._compare_error(id_, expected, given=response.body)
261 261
262 262 def test_api_pull_remote(self):
263 263 # Note: pulling from local repos is a misfeature - it will bypass access control
264 264 # ... but ok, if the path already has been set in the database
265 265 repo_name = 'test_pull'
266 266 r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
267 267 # hack around that clone_uri can't be set to to a local path
268 268 # (as shown by test_api_create_repo_clone_uri_local)
269 269 r.clone_uri = os.path.join(db.Ui.get_by_key('paths', '/').ui_value, self.REPO)
270 270 meta.Session().commit()
271 271
272 272 pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == repo_name)]
273 273
274 274 id_, params = _build_data(self.apikey, 'pull',
275 275 repoid=repo_name,)
276 276 response = api_call(self, params)
277 277
278 278 expected = {'msg': 'Pulled from `%s`' % repo_name,
279 279 'repository': repo_name}
280 280 self._compare_ok(id_, expected, given=response.body)
281 281
282 282 post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == repo_name)]
283 283
284 284 fixture.destroy_repo(repo_name)
285 285
286 286 assert pre_cached_tip != post_cached_tip
287 287
288 288 def test_api_pull_fork(self):
289 289 fork_name = 'fork'
290 290 fixture.create_fork(self.REPO, fork_name)
291 291 id_, params = _build_data(self.apikey, 'pull',
292 292 repoid=fork_name,)
293 293 response = api_call(self, params)
294 294
295 295 expected = {'msg': 'Pulled from `%s`' % fork_name,
296 296 'repository': fork_name}
297 297 self._compare_ok(id_, expected, given=response.body)
298 298
299 299 fixture.destroy_repo(fork_name)
300 300
301 301 def test_api_pull_error_no_remote_no_fork(self):
302 302 # should fail because no clone_uri is set
303 303 id_, params = _build_data(self.apikey, 'pull',
304 304 repoid=self.REPO, )
305 305 response = api_call(self, params)
306 306
307 307 expected = 'Unable to pull changes from `%s`' % self.REPO
308 308 self._compare_error(id_, expected, given=response.body)
309 309
310 310 def test_api_pull_custom_remote(self):
311 311 repo_name = 'test_pull_custom_remote'
312 312 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
313 313
314 314 custom_remote_path = os.path.join(db.Ui.get_by_key('paths', '/').ui_value, self.REPO)
315 315
316 316 id_, params = _build_data(self.apikey, 'pull',
317 317 repoid=repo_name,
318 318 clone_uri=custom_remote_path)
319 319 response = api_call(self, params)
320 320
321 321 expected = {'msg': 'Pulled from `%s`' % repo_name,
322 322 'repository': repo_name}
323 323 self._compare_ok(id_, expected, given=response.body)
324 324
325 325 fixture.destroy_repo(repo_name)
326 326
327 327 def test_api_rescan_repos(self):
328 328 id_, params = _build_data(self.apikey, 'rescan_repos')
329 329 response = api_call(self, params)
330 330
331 331 expected = {'added': [], 'removed': []}
332 332 self._compare_ok(id_, expected, given=response.body)
333 333
334 334 @mock.patch.object(ScmModel, 'repo_scan', raise_exception)
335 335 def test_api_rescann_error(self):
336 336 id_, params = _build_data(self.apikey, 'rescan_repos', )
337 337 response = api_call(self, params)
338 338
339 339 expected = 'Error occurred during rescan repositories action'
340 340 self._compare_error(id_, expected, given=response.body)
341 341
342 342 def test_api_create_existing_user(self):
343 343 id_, params = _build_data(self.apikey, 'create_user',
344 344 username=base.TEST_USER_ADMIN_LOGIN,
345 345 email='test@example.com',
346 346 password='trololo')
347 347 response = api_call(self, params)
348 348
349 349 expected = "user `%s` already exist" % base.TEST_USER_ADMIN_LOGIN
350 350 self._compare_error(id_, expected, given=response.body)
351 351
352 352 def test_api_create_user_with_existing_email(self):
353 353 id_, params = _build_data(self.apikey, 'create_user',
354 354 username=base.TEST_USER_ADMIN_LOGIN + 'new',
355 355 email=base.TEST_USER_REGULAR_EMAIL,
356 356 password='trololo')
357 357 response = api_call(self, params)
358 358
359 359 expected = "email `%s` already exist" % base.TEST_USER_REGULAR_EMAIL
360 360 self._compare_error(id_, expected, given=response.body)
361 361
362 362 def test_api_create_user(self):
363 363 username = 'test_new_api_user'
364 364 email = username + "@example.com"
365 365
366 366 id_, params = _build_data(self.apikey, 'create_user',
367 367 username=username,
368 368 email=email,
369 369 password='trololo')
370 370 response = api_call(self, params)
371 371
372 372 usr = db.User.get_by_username(username)
373 373 ret = dict(
374 374 msg='created new user `%s`' % username,
375 375 user=jsonify(usr.get_api_data())
376 376 )
377 377
378 378 try:
379 379 expected = ret
380 380 self._compare_ok(id_, expected, given=response.body)
381 381 finally:
382 382 fixture.destroy_user(usr.user_id)
383 383
384 384 def test_api_create_user_without_password(self):
385 385 username = 'test_new_api_user_passwordless'
386 386 email = username + "@example.com"
387 387
388 388 id_, params = _build_data(self.apikey, 'create_user',
389 389 username=username,
390 390 email=email)
391 391 response = api_call(self, params)
392 392
393 393 usr = db.User.get_by_username(username)
394 394 ret = dict(
395 395 msg='created new user `%s`' % username,
396 396 user=jsonify(usr.get_api_data())
397 397 )
398 398 try:
399 399 expected = ret
400 400 self._compare_ok(id_, expected, given=response.body)
401 401 finally:
402 402 fixture.destroy_user(usr.user_id)
403 403
404 404 def test_api_create_user_with_extern_name(self):
405 405 username = 'test_new_api_user_passwordless'
406 406 email = username + "@example.com"
407 407
408 408 id_, params = _build_data(self.apikey, 'create_user',
409 409 username=username,
410 410 email=email, extern_name='internal')
411 411 response = api_call(self, params)
412 412
413 413 usr = db.User.get_by_username(username)
414 414 ret = dict(
415 415 msg='created new user `%s`' % username,
416 416 user=jsonify(usr.get_api_data())
417 417 )
418 418 try:
419 419 expected = ret
420 420 self._compare_ok(id_, expected, given=response.body)
421 421 finally:
422 422 fixture.destroy_user(usr.user_id)
423 423
424 424 @mock.patch.object(UserModel, 'create_or_update', raise_exception)
425 425 def test_api_create_user_when_exception_happened(self):
426 426
427 427 username = 'test_new_api_user'
428 428 email = username + "@example.com"
429 429
430 430 id_, params = _build_data(self.apikey, 'create_user',
431 431 username=username,
432 432 email=email,
433 433 password='trololo')
434 434 response = api_call(self, params)
435 435 expected = 'failed to create user `%s`' % username
436 436 self._compare_error(id_, expected, given=response.body)
437 437
438 438 def test_api_delete_user(self):
439 439 usr = UserModel().create_or_update(username='test_user',
440 440 password='qweqwe',
441 441 email='u232@example.com',
442 442 firstname='u1', lastname='u1')
443 443 meta.Session().commit()
444 444 username = usr.username
445 445 email = usr.email
446 446 usr_id = usr.user_id
447 447 ## DELETE THIS USER NOW
448 448
449 449 id_, params = _build_data(self.apikey, 'delete_user',
450 450 userid=username, )
451 451 response = api_call(self, params)
452 452
453 453 ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
454 454 'user': None}
455 455 expected = ret
456 456 self._compare_ok(id_, expected, given=response.body)
457 457
458 458 @mock.patch.object(UserModel, 'delete', raise_exception)
459 459 def test_api_delete_user_when_exception_happened(self):
460 460 usr = UserModel().create_or_update(username='test_user',
461 461 password='qweqwe',
462 462 email='u232@example.com',
463 463 firstname='u1', lastname='u1')
464 464 meta.Session().commit()
465 465 username = usr.username
466 466
467 467 id_, params = _build_data(self.apikey, 'delete_user',
468 468 userid=username, )
469 469 response = api_call(self, params)
470 470 ret = 'failed to delete user ID:%s %s' % (usr.user_id,
471 471 usr.username)
472 472 expected = ret
473 473 self._compare_error(id_, expected, given=response.body)
474 474
475 475 @base.parametrize('name,expected', [
476 476 ('firstname', 'new_username'),
477 477 ('lastname', 'new_username'),
478 478 ('email', 'new_username'),
479 479 ('admin', True),
480 480 ('admin', False),
481 481 ('extern_type', 'ldap'),
482 482 ('extern_type', None),
483 483 ('extern_name', 'test'),
484 484 ('extern_name', None),
485 485 ('active', False),
486 486 ('active', True),
487 487 ('password', 'newpass'),
488 488 ])
489 489 def test_api_update_user(self, name, expected):
490 490 usr = db.User.get_by_username(self.TEST_USER_LOGIN)
491 491 kw = {name: expected,
492 492 'userid': usr.user_id}
493 493 id_, params = _build_data(self.apikey, 'update_user', **kw)
494 494 response = api_call(self, params)
495 495
496 496 ret = {
497 497 'msg': 'updated user ID:%s %s' % (
498 498 usr.user_id, self.TEST_USER_LOGIN),
499 499 'user': jsonify(db.User \
500 500 .get_by_username(self.TEST_USER_LOGIN) \
501 501 .get_api_data())
502 502 }
503 503
504 504 expected = ret
505 505 self._compare_ok(id_, expected, given=response.body)
506 506
507 507 def test_api_update_user_no_changed_params(self):
508 508 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
509 509 ret = jsonify(usr.get_api_data())
510 510 id_, params = _build_data(self.apikey, 'update_user',
511 511 userid=base.TEST_USER_ADMIN_LOGIN)
512 512
513 513 response = api_call(self, params)
514 514 ret = {
515 515 'msg': 'updated user ID:%s %s' % (
516 516 usr.user_id, base.TEST_USER_ADMIN_LOGIN),
517 517 'user': ret
518 518 }
519 519 expected = ret
520 520 self._compare_ok(id_, expected, given=response.body)
521 521
522 522 def test_api_update_user_by_user_id(self):
523 523 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
524 524 ret = jsonify(usr.get_api_data())
525 525 id_, params = _build_data(self.apikey, 'update_user',
526 526 userid=usr.user_id)
527 527
528 528 response = api_call(self, params)
529 529 ret = {
530 530 'msg': 'updated user ID:%s %s' % (
531 531 usr.user_id, base.TEST_USER_ADMIN_LOGIN),
532 532 'user': ret
533 533 }
534 534 expected = ret
535 535 self._compare_ok(id_, expected, given=response.body)
536 536
537 537 def test_api_update_user_default_user(self):
538 538 usr = db.User.get_default_user()
539 539 id_, params = _build_data(self.apikey, 'update_user',
540 540 userid=usr.user_id)
541 541
542 542 response = api_call(self, params)
543 543 expected = 'editing default user is forbidden'
544 544 self._compare_error(id_, expected, given=response.body)
545 545
546 546 @mock.patch.object(UserModel, 'update_user', raise_exception)
547 547 def test_api_update_user_when_exception_happens(self):
548 548 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
549 549 ret = jsonify(usr.get_api_data())
550 550 id_, params = _build_data(self.apikey, 'update_user',
551 551 userid=usr.user_id)
552 552
553 553 response = api_call(self, params)
554 554 ret = 'failed to update user `%s`' % usr.user_id
555 555
556 556 expected = ret
557 557 self._compare_error(id_, expected, given=response.body)
558 558
559 559 def test_api_get_repo(self):
560 560 new_group = 'some_new_group'
561 561 make_user_group(new_group)
562 562 RepoModel().grant_user_group_permission(repo=self.REPO,
563 563 group_name=new_group,
564 564 perm='repository.read')
565 565 meta.Session().commit()
566 566 id_, params = _build_data(self.apikey, 'get_repo',
567 567 repoid=self.REPO)
568 568 response = api_call(self, params)
569 569 assert "tags" not in response.json['result']
570 570 assert 'pull_requests' not in response.json['result']
571 571
572 572 repo = RepoModel().get_by_repo_name(self.REPO)
573 573 ret = repo.get_api_data()
574 574
575 575 members = []
576 576 followers = []
577 577 for user in repo.repo_to_perm:
578 578 perm = user.permission.permission_name
579 579 user = user.user
580 580 user_data = {'name': user.username, 'type': "user",
581 581 'permission': perm}
582 582 members.append(user_data)
583 583
584 584 for user_group in repo.users_group_to_perm:
585 585 perm = user_group.permission.permission_name
586 586 user_group = user_group.users_group
587 587 user_group_data = {'name': user_group.users_group_name,
588 588 'type': "user_group", 'permission': perm}
589 589 members.append(user_group_data)
590 590
591 591 for user in repo.followers:
592 592 followers.append(user.user.get_api_data())
593 593
594 594 ret['members'] = members
595 595 ret['followers'] = followers
596 596
597 597 expected = ret
598 598 self._compare_ok(id_, expected, given=response.body)
599 599 fixture.destroy_user_group(new_group)
600 600
601 601 id_, params = _build_data(self.apikey, 'get_repo', repoid=self.REPO,
602 602 with_revision_names=True,
603 603 with_pullrequests=True)
604 604 response = api_call(self, params)
605 605 assert "v0.2.0" in response.json['result']['tags']
606 606 assert 'pull_requests' in response.json['result']
607 607
608 608 @base.parametrize('grant_perm', [
609 609 ('repository.admin'),
610 610 ('repository.write'),
611 611 ('repository.read'),
612 612 ])
613 613 def test_api_get_repo_by_non_admin(self, grant_perm):
614 614 RepoModel().grant_user_permission(repo=self.REPO,
615 615 user=self.TEST_USER_LOGIN,
616 616 perm=grant_perm)
617 617 meta.Session().commit()
618 618 id_, params = _build_data(self.apikey_regular, 'get_repo',
619 619 repoid=self.REPO)
620 620 response = api_call(self, params)
621 621
622 622 repo = RepoModel().get_by_repo_name(self.REPO)
623 623 assert len(repo.repo_to_perm) >= 2 # make sure we actually are testing something - probably the default 2 permissions, possibly more
624 624
625 625 expected = repo.get_api_data()
626 626
627 627 members = []
628 628 for user in repo.repo_to_perm:
629 629 perm = user.permission.permission_name
630 630 user_obj = user.user
631 631 user_data = {'name': user_obj.username, 'type': "user",
632 632 'permission': perm}
633 633 members.append(user_data)
634 634 for user_group in repo.users_group_to_perm:
635 635 perm = user_group.permission.permission_name
636 636 user_group_obj = user_group.users_group
637 637 user_group_data = {'name': user_group_obj.users_group_name,
638 638 'type': "user_group", 'permission': perm}
639 639 members.append(user_group_data)
640 640 expected['members'] = members
641 641
642 642 followers = []
643 643
644 644 for user in repo.followers:
645 645 followers.append(user.user.get_api_data())
646 646
647 647 expected['followers'] = followers
648 648
649 649 try:
650 650 self._compare_ok(id_, expected, given=response.body)
651 651 finally:
652 652 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
653 653
654 654 def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
655 655 RepoModel().grant_user_permission(repo=self.REPO,
656 656 user=db.User.DEFAULT_USER_NAME,
657 657 perm='repository.none')
658 658 try:
659 659 RepoModel().grant_user_permission(repo=self.REPO,
660 660 user=self.TEST_USER_LOGIN,
661 661 perm='repository.none')
662 662
663 663 id_, params = _build_data(self.apikey_regular, 'get_repo',
664 664 repoid=self.REPO)
665 665 response = api_call(self, params)
666 666
667 667 expected = 'repository `%s` does not exist' % (self.REPO)
668 668 self._compare_error(id_, expected, given=response.body)
669 669 finally:
670 670 RepoModel().grant_user_permission(repo=self.REPO,
671 671 user=db.User.DEFAULT_USER_NAME,
672 672 perm='repository.read')
673 673
674 674 def test_api_get_repo_that_doesn_not_exist(self):
675 675 id_, params = _build_data(self.apikey, 'get_repo',
676 676 repoid='no-such-repo')
677 677 response = api_call(self, params)
678 678
679 679 ret = 'repository `%s` does not exist' % 'no-such-repo'
680 680 expected = ret
681 681 self._compare_error(id_, expected, given=response.body)
682 682
683 683 def test_api_get_repos(self):
684 684 id_, params = _build_data(self.apikey, 'get_repos')
685 685 response = api_call(self, params)
686 686
687 687 expected = jsonify([
688 688 repo.get_api_data()
689 689 for repo in db.Repository.query()
690 690 ])
691 691
692 692 self._compare_ok(id_, expected, given=response.body)
693 693
694 694 def test_api_get_repos_non_admin(self):
695 695 id_, params = _build_data(self.apikey_regular, 'get_repos')
696 696 response = api_call(self, params)
697 697
698 698 expected = jsonify([
699 699 repo.get_api_data()
700 700 for repo in AuthUser(dbuser=db.User.get_by_username(self.TEST_USER_LOGIN)).get_all_user_repos()
701 701 ])
702 702
703 703 self._compare_ok(id_, expected, given=response.body)
704 704
705 705 @base.parametrize('name,ret_type', [
706 706 ('all', 'all'),
707 707 ('dirs', 'dirs'),
708 708 ('files', 'files'),
709 709 ])
710 710 def test_api_get_repo_nodes(self, name, ret_type):
711 711 rev = 'tip'
712 712 path = '/'
713 713 id_, params = _build_data(self.apikey, 'get_repo_nodes',
714 714 repoid=self.REPO, revision=rev,
715 715 root_path=path,
716 716 ret_type=ret_type)
717 717 response = api_call(self, params)
718 718
719 719 # we don't the actual return types here since it's tested somewhere
720 720 # else
721 721 expected = response.json['result']
722 722 self._compare_ok(id_, expected, given=response.body)
723 723
724 724 def test_api_get_repo_nodes_bad_revisions(self):
725 725 rev = 'i-dont-exist'
726 726 path = '/'
727 727 id_, params = _build_data(self.apikey, 'get_repo_nodes',
728 728 repoid=self.REPO, revision=rev,
729 729 root_path=path, )
730 730 response = api_call(self, params)
731 731
732 732 expected = 'failed to get repo: `%s` nodes' % self.REPO
733 733 self._compare_error(id_, expected, given=response.body)
734 734
735 735 def test_api_get_repo_nodes_bad_path(self):
736 736 rev = 'tip'
737 737 path = '/idontexits'
738 738 id_, params = _build_data(self.apikey, 'get_repo_nodes',
739 739 repoid=self.REPO, revision=rev,
740 740 root_path=path, )
741 741 response = api_call(self, params)
742 742
743 743 expected = 'failed to get repo: `%s` nodes' % self.REPO
744 744 self._compare_error(id_, expected, given=response.body)
745 745
746 746 def test_api_get_repo_nodes_bad_ret_type(self):
747 747 rev = 'tip'
748 748 path = '/'
749 749 ret_type = 'error'
750 750 id_, params = _build_data(self.apikey, 'get_repo_nodes',
751 751 repoid=self.REPO, revision=rev,
752 752 root_path=path,
753 753 ret_type=ret_type)
754 754 response = api_call(self, params)
755 755
756 756 expected = ('ret_type must be one of %s'
757 757 % (','.join(sorted(['files', 'dirs', 'all']))))
758 758 self._compare_error(id_, expected, given=response.body)
759 759
760 760 @base.parametrize('name,ret_type,grant_perm', [
761 761 ('all', 'all', 'repository.write'),
762 762 ('dirs', 'dirs', 'repository.admin'),
763 763 ('files', 'files', 'repository.read'),
764 764 ])
765 765 def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
766 766 RepoModel().grant_user_permission(repo=self.REPO,
767 767 user=self.TEST_USER_LOGIN,
768 768 perm=grant_perm)
769 769 meta.Session().commit()
770 770
771 771 rev = 'tip'
772 772 path = '/'
773 773 id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
774 774 repoid=self.REPO, revision=rev,
775 775 root_path=path,
776 776 ret_type=ret_type)
777 777 response = api_call(self, params)
778 778
779 779 # we don't the actual return types here since it's tested somewhere
780 780 # else
781 781 expected = response.json['result']
782 782 try:
783 783 self._compare_ok(id_, expected, given=response.body)
784 784 finally:
785 785 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
786 786
787 787 @base.parametrize('changing_attr,updates', [
788 788 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
789 789 ('description', {'description': 'new description'}),
790 790 ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
791 791 ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control
792 792 ('clone_uri', {'clone_uri': None}),
793 793 ('landing_rev', {'landing_rev': 'branch:master'}),
794 794 ('private', {'private': True}),
795 795 ('enable_statistics', {'enable_statistics': True}),
796 796 ('enable_downloads', {'enable_downloads': True}),
797 797 ('repo_group', {'group': 'test_group_for_update'}),
798 798 ])
799 799 def test_api_create_repo(self, changing_attr, updates):
800 800 repo_name = repo_name_full = 'new_repo'
801 801
802 802 if changing_attr == 'repo_group':
803 803 group_name = updates['group']
804 804 fixture.create_repo_group(group_name)
805 805 repo_name_full = '/'.join([group_name, repo_name])
806 806 updates = {}
807 807
808 808 id_, params = _build_data(self.apikey, 'create_repo',
809 809 repo_type=self.REPO_TYPE, repo_name=repo_name_full, **updates)
810 810 response = api_call(self, params)
811 811
812 812 try:
813 813 expected = {
814 814 'msg': 'Created new repository `%s`' % repo_name_full,
815 815 'success': True}
816 816 if changing_attr == 'clone_uri' and updates['clone_uri']:
817 817 expected = 'failed to create repository `%s`' % repo_name
818 818 self._compare_error(id_, expected, given=response.body)
819 819 return
820 820 else:
821 821 self._compare_ok(id_, expected, given=response.body)
822 822
823 823 repo = db.Repository.get_by_repo_name(repo_name_full)
824 824 assert repo is not None
825 825
826 826 expected_data = {
827 827 'clone_uri': None,
828 828 'created_on': repo.created_on,
829 829 'description': repo_name,
830 830 'enable_downloads': False,
831 831 'enable_statistics': False,
832 832 'fork_of': None,
833 833 'landing_rev': ['rev', 'tip'],
834 834 'last_changeset': {'author': '',
835 835 'date': datetime.datetime(1970, 1, 1, 0, 0),
836 836 'message': '',
837 837 'raw_id': '0000000000000000000000000000000000000000',
838 838 'revision': -1,
839 839 'short_id': '000000000000'},
840 840 'owner': 'test_admin',
841 841 'private': False,
842 842 'repo_id': repo.repo_id,
843 843 'repo_name': repo_name_full,
844 844 'repo_type': self.REPO_TYPE,
845 845 }
846 846 expected_data.update(updates)
847 847 if changing_attr == 'landing_rev':
848 848 expected_data['landing_rev'] = expected_data['landing_rev'].split(':', 1)
849 849 assert repo.get_api_data() == expected_data
850 850 finally:
851 851 fixture.destroy_repo(repo_name_full)
852 852 if changing_attr == 'repo_group':
853 853 fixture.destroy_repo_group(group_name)
854 854
855 855 @base.parametrize('repo_name', [
856 856 '',
857 857 '.',
858 858 '..',
859 859 ':',
860 860 '/',
861 861 '<test>',
862 862 ])
863 863 def test_api_create_repo_bad_names(self, repo_name):
864 864 id_, params = _build_data(self.apikey, 'create_repo',
865 865 repo_name=repo_name,
866 866 owner=base.TEST_USER_ADMIN_LOGIN,
867 867 repo_type=self.REPO_TYPE,
868 868 )
869 869 response = api_call(self, params)
870 870 if repo_name == '/':
871 871 expected = "repo group `` not found"
872 872 self._compare_error(id_, expected, given=response.body)
873 873 else:
874 874 expected = "failed to create repository `%s`" % repo_name
875 875 self._compare_error(id_, expected, given=response.body)
876 876 fixture.destroy_repo(repo_name)
877 877
878 878 def test_api_create_repo_clone_uri_local(self):
879 879 # cloning from local repos was a misfeature - it would bypass access control
880 880 # TODO: introduce other test coverage of actual remote cloning
881 881 clone_uri = os.path.join(base.TESTS_TMP_PATH, self.REPO)
882 882 repo_name = 'api-repo'
883 883 id_, params = _build_data(self.apikey, 'create_repo',
884 884 repo_name=repo_name,
885 885 owner=base.TEST_USER_ADMIN_LOGIN,
886 886 repo_type=self.REPO_TYPE,
887 887 clone_uri=clone_uri,
888 888 )
889 889 response = api_call(self, params)
890 890 expected = "failed to create repository `%s`" % repo_name
891 891 self._compare_error(id_, expected, given=response.body)
892 892 fixture.destroy_repo(repo_name)
893 893
894 894 def test_api_create_repo_and_repo_group(self):
895 895 repo_group_name = 'my_gr'
896 896 repo_name = '%s/api-repo' % repo_group_name
897 897
898 898 # repo creation can no longer also create repo group
899 899 id_, params = _build_data(self.apikey, 'create_repo',
900 900 repo_name=repo_name,
901 901 owner=base.TEST_USER_ADMIN_LOGIN,
902 902 repo_type=self.REPO_TYPE,)
903 903 response = api_call(self, params)
904 904 expected = 'repo group `%s` not found' % repo_group_name
905 905 self._compare_error(id_, expected, given=response.body)
906 906 assert RepoModel().get_by_repo_name(repo_name) is None
907 907
908 908 # create group before creating repo
909 909 rg = fixture.create_repo_group(repo_group_name)
910 910 meta.Session().commit()
911 911
912 912 id_, params = _build_data(self.apikey, 'create_repo',
913 913 repo_name=repo_name,
914 914 owner=base.TEST_USER_ADMIN_LOGIN,
915 915 repo_type=self.REPO_TYPE,)
916 916 response = api_call(self, params)
917 917 expected = {
918 918 'msg': 'Created new repository `%s`' % repo_name,
919 919 'success': True,
920 920 }
921 921 self._compare_ok(id_, expected, given=response.body)
922 922 repo = RepoModel().get_by_repo_name(repo_name)
923 923 assert repo is not None
924 924
925 925 fixture.destroy_repo(repo_name)
926 926 fixture.destroy_repo_group(repo_group_name)
927 927
928 928 def test_api_create_repo_in_repo_group_without_permission(self):
929 929 repo_group_basename = 'api-repo-repo'
930 930 repo_group_name = '%s/%s' % (TEST_REPO_GROUP, repo_group_basename)
931 931 repo_name = '%s/api-repo' % repo_group_name
932 932
933 933 top_group = db.RepoGroup.get_by_group_name(TEST_REPO_GROUP)
934 934 assert top_group
935 935 rg = fixture.create_repo_group(repo_group_basename, parent_group_id=top_group)
936 936 meta.Session().commit()
937 937 RepoGroupModel().grant_user_permission(repo_group_name,
938 938 self.TEST_USER_LOGIN,
939 939 'group.none')
940 940 meta.Session().commit()
941 941
942 942 id_, params = _build_data(self.apikey_regular, 'create_repo',
943 943 repo_name=repo_name,
944 944 repo_type=self.REPO_TYPE,
945 945 )
946 946 response = api_call(self, params)
947 947
948 948 # API access control match Web access control:
949 949 expected = 'no permission to create repo in test_repo_group/api-repo-repo'
950 950 self._compare_error(id_, expected, given=response.body)
951 951
952 952 fixture.destroy_repo(repo_name)
953 953 fixture.destroy_repo_group(repo_group_name)
954 954
955 955 def test_api_create_repo_unknown_owner(self):
956 956 repo_name = 'api-repo'
957 957 owner = 'i-dont-exist'
958 958 id_, params = _build_data(self.apikey, 'create_repo',
959 959 repo_name=repo_name,
960 960 owner=owner,
961 961 repo_type=self.REPO_TYPE,
962 962 )
963 963 response = api_call(self, params)
964 964 expected = 'user `%s` does not exist' % owner
965 965 self._compare_error(id_, expected, given=response.body)
966 966
967 967 def test_api_create_repo_dont_specify_owner(self):
968 968 repo_name = 'api-repo'
969 969 owner = 'i-dont-exist'
970 970 id_, params = _build_data(self.apikey, 'create_repo',
971 971 repo_name=repo_name,
972 972 repo_type=self.REPO_TYPE,
973 973 )
974 974 response = api_call(self, params)
975 975
976 976 repo = RepoModel().get_by_repo_name(repo_name)
977 977 assert repo is not None
978 978 ret = {
979 979 'msg': 'Created new repository `%s`' % repo_name,
980 980 'success': True,
981 981 }
982 982 expected = ret
983 983 self._compare_ok(id_, expected, given=response.body)
984 984 fixture.destroy_repo(repo_name)
985 985
986 986 def test_api_create_repo_by_non_admin(self):
987 987 repo_name = 'api-repo'
988 988 owner = 'i-dont-exist'
989 989 id_, params = _build_data(self.apikey_regular, 'create_repo',
990 990 repo_name=repo_name,
991 991 repo_type=self.REPO_TYPE,
992 992 )
993 993 response = api_call(self, params)
994 994
995 995 repo = RepoModel().get_by_repo_name(repo_name)
996 996 assert repo is not None
997 997 ret = {
998 998 'msg': 'Created new repository `%s`' % repo_name,
999 999 'success': True,
1000 1000 }
1001 1001 expected = ret
1002 1002 self._compare_ok(id_, expected, given=response.body)
1003 1003 fixture.destroy_repo(repo_name)
1004 1004
1005 1005 def test_api_create_repo_by_non_admin_specify_owner(self):
1006 1006 repo_name = 'api-repo'
1007 1007 owner = 'i-dont-exist'
1008 1008 id_, params = _build_data(self.apikey_regular, 'create_repo',
1009 1009 repo_name=repo_name,
1010 1010 repo_type=self.REPO_TYPE,
1011 1011 owner=owner)
1012 1012 response = api_call(self, params)
1013 1013
1014 1014 expected = 'Only Kallithea admin can specify `owner` param'
1015 1015 self._compare_error(id_, expected, given=response.body)
1016 1016 fixture.destroy_repo(repo_name)
1017 1017
1018 1018 def test_api_create_repo_exists(self):
1019 1019 repo_name = self.REPO
1020 1020 id_, params = _build_data(self.apikey, 'create_repo',
1021 1021 repo_name=repo_name,
1022 1022 owner=base.TEST_USER_ADMIN_LOGIN,
1023 1023 repo_type=self.REPO_TYPE,)
1024 1024 response = api_call(self, params)
1025 1025 expected = "repo `%s` already exist" % repo_name
1026 1026 self._compare_error(id_, expected, given=response.body)
1027 1027
1028 1028 def test_api_create_repo_dot_dot(self):
1029 1029 # it is only possible to create repositories in existing repo groups - and '..' can't be used
1030 1030 group_name = '%s/..' % TEST_REPO_GROUP
1031 1031 repo_name = '%s/%s' % (group_name, 'could-be-outside')
1032 1032 id_, params = _build_data(self.apikey, 'create_repo',
1033 1033 repo_name=repo_name,
1034 1034 owner=base.TEST_USER_ADMIN_LOGIN,
1035 1035 repo_type=self.REPO_TYPE,)
1036 1036 response = api_call(self, params)
1037 1037 expected = 'repo group `%s` not found' % group_name
1038 1038 self._compare_error(id_, expected, given=response.body)
1039 1039 fixture.destroy_repo(repo_name)
1040 1040
1041 1041 @mock.patch.object(RepoModel, 'create', raise_exception)
1042 1042 def test_api_create_repo_exception_occurred(self):
1043 1043 repo_name = 'api-repo'
1044 1044 id_, params = _build_data(self.apikey, 'create_repo',
1045 1045 repo_name=repo_name,
1046 1046 owner=base.TEST_USER_ADMIN_LOGIN,
1047 1047 repo_type=self.REPO_TYPE,)
1048 1048 response = api_call(self, params)
1049 1049 expected = 'failed to create repository `%s`' % repo_name
1050 1050 self._compare_error(id_, expected, given=response.body)
1051 1051
1052 1052 @base.parametrize('changing_attr,updates', [
1053 1053 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1054 1054 ('description', {'description': 'new description'}),
1055 1055 ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
1056 1056 ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control
1057 1057 ('clone_uri', {'clone_uri': None}),
1058 1058 ('landing_rev', {'landing_rev': 'branch:master'}),
1059 1059 ('private', {'private': True}),
1060 1060 ('enable_statistics', {'enable_statistics': True}),
1061 1061 ('enable_downloads', {'enable_downloads': True}),
1062 1062 ('name', {'name': 'new_repo_name'}),
1063 1063 ('repo_group', {'group': 'test_group_for_update'}),
1064 1064 ])
1065 1065 def test_api_update_repo(self, changing_attr, updates):
1066 1066 repo_name = 'api_update_me'
1067 1067 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1068 1068 if changing_attr == 'repo_group':
1069 1069 fixture.create_repo_group(updates['group'])
1070 1070
1071 1071 id_, params = _build_data(self.apikey, 'update_repo',
1072 1072 repoid=repo_name, **updates)
1073 1073
1074 1074 if changing_attr == 'name':
1075 1075 repo_name = updates['name']
1076 1076 if changing_attr == 'repo_group':
1077 1077 repo_name = '/'.join([updates['group'], repo_name])
1078 1078 expected = {
1079 1079 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
1080 1080 'repository': repo.get_api_data()
1081 1081 }
1082 1082 expected['repository'].update(updates)
1083 1083 if changing_attr == 'clone_uri' and updates['clone_uri'] is None:
1084 1084 expected['repository']['clone_uri'] = ''
1085 1085 if changing_attr == 'landing_rev':
1086 1086 expected['repository']['landing_rev'] = expected['repository']['landing_rev'].split(':', 1)
1087 1087 if changing_attr == 'name':
1088 1088 expected['repository']['repo_name'] = expected['repository'].pop('name')
1089 1089 if changing_attr == 'repo_group':
1090 1090 expected['repository']['repo_name'] = expected['repository'].pop('group') + '/' + repo.repo_name
1091 1091
1092 1092 response = api_call(self, params)
1093 1093
1094 1094 try:
1095 1095 if changing_attr == 'clone_uri' and updates['clone_uri']:
1096 1096 expected = 'failed to update repo `%s`' % repo_name
1097 1097 self._compare_error(id_, expected, given=response.body)
1098 1098 else:
1099 1099 self._compare_ok(id_, expected, given=response.body)
1100 1100 finally:
1101 1101 fixture.destroy_repo(repo_name)
1102 1102 if changing_attr == 'repo_group':
1103 1103 fixture.destroy_repo_group(updates['group'])
1104 1104
1105 1105 @base.parametrize('changing_attr,updates', [
1106 1106 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1107 1107 ('description', {'description': 'new description'}),
1108 1108 ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
1109 1109 ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control
1110 1110 ('clone_uri', {'clone_uri': None}),
1111 1111 ('landing_rev', {'landing_rev': 'branch:master'}),
1112 1112 ('enable_statistics', {'enable_statistics': True}),
1113 1113 ('enable_downloads', {'enable_downloads': True}),
1114 1114 ('name', {'name': 'new_repo_name'}),
1115 1115 ('repo_group', {'group': 'test_group_for_update'}),
1116 1116 ])
1117 1117 def test_api_update_group_repo(self, changing_attr, updates):
1118 1118 group_name = 'lololo'
1119 1119 fixture.create_repo_group(group_name)
1120 1120 repo_name = '%s/api_update_me' % group_name
1121 1121 repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
1122 1122 if changing_attr == 'repo_group':
1123 1123 fixture.create_repo_group(updates['group'])
1124 1124
1125 1125 id_, params = _build_data(self.apikey, 'update_repo',
1126 1126 repoid=repo_name, **updates)
1127 1127 response = api_call(self, params)
1128 1128 if changing_attr == 'name':
1129 1129 repo_name = '%s/%s' % (group_name, updates['name'])
1130 1130 if changing_attr == 'repo_group':
1131 1131 repo_name = '/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
1132 1132 try:
1133 1133 if changing_attr == 'clone_uri' and updates['clone_uri']:
1134 1134 expected = 'failed to update repo `%s`' % repo_name
1135 1135 self._compare_error(id_, expected, given=response.body)
1136 1136 else:
1137 1137 expected = {
1138 1138 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
1139 1139 'repository': repo.get_api_data()
1140 1140 }
1141 1141 self._compare_ok(id_, expected, given=response.body)
1142 1142 finally:
1143 1143 fixture.destroy_repo(repo_name)
1144 1144 if changing_attr == 'repo_group':
1145 1145 fixture.destroy_repo_group(updates['group'])
1146 1146 fixture.destroy_repo_group(group_name)
1147 1147
1148 1148 def test_api_update_repo_repo_group_does_not_exist(self):
1149 1149 repo_name = 'admin_owned'
1150 1150 fixture.create_repo(repo_name)
1151 1151 updates = {'group': 'test_group_for_update'}
1152 1152 id_, params = _build_data(self.apikey, 'update_repo',
1153 1153 repoid=repo_name, **updates)
1154 1154 response = api_call(self, params)
1155 1155 try:
1156 1156 expected = 'repository group `%s` does not exist' % updates['group']
1157 1157 self._compare_error(id_, expected, given=response.body)
1158 1158 finally:
1159 1159 fixture.destroy_repo(repo_name)
1160 1160
1161 1161 def test_api_update_repo_regular_user_not_allowed(self):
1162 1162 repo_name = 'admin_owned'
1163 1163 fixture.create_repo(repo_name)
1164 1164 updates = {'description': 'something else'}
1165 1165 id_, params = _build_data(self.apikey_regular, 'update_repo',
1166 1166 repoid=repo_name, **updates)
1167 1167 response = api_call(self, params)
1168 1168 try:
1169 1169 expected = 'repository `%s` does not exist' % repo_name
1170 1170 self._compare_error(id_, expected, given=response.body)
1171 1171 finally:
1172 1172 fixture.destroy_repo(repo_name)
1173 1173
1174 1174 @mock.patch.object(RepoModel, 'update', raise_exception)
1175 1175 def test_api_update_repo_exception_occurred(self):
1176 1176 repo_name = 'api_update_me'
1177 1177 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1178 1178 id_, params = _build_data(self.apikey, 'update_repo',
1179 1179 repoid=repo_name, owner=base.TEST_USER_ADMIN_LOGIN,)
1180 1180 response = api_call(self, params)
1181 1181 try:
1182 1182 expected = 'failed to update repo `%s`' % repo_name
1183 1183 self._compare_error(id_, expected, given=response.body)
1184 1184 finally:
1185 1185 fixture.destroy_repo(repo_name)
1186 1186
1187 1187 def test_api_update_repo_regular_user_change_top_level_repo_name(self):
1188 1188 repo_name = 'admin_owned'
1189 1189 new_repo_name = 'new_repo_name'
1190 1190 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1191 1191 RepoModel().grant_user_permission(repo=repo_name,
1192 1192 user=self.TEST_USER_LOGIN,
1193 1193 perm='repository.admin')
1194 1194 UserModel().revoke_perm('default', 'hg.create.repository')
1195 1195 UserModel().grant_perm('default', 'hg.create.none')
1196 1196 updates = {'name': new_repo_name}
1197 1197 id_, params = _build_data(self.apikey_regular, 'update_repo',
1198 1198 repoid=repo_name, **updates)
1199 1199 response = api_call(self, params)
1200 1200 try:
1201 1201 expected = 'no permission to create (or move) top level repositories'
1202 1202 self._compare_error(id_, expected, given=response.body)
1203 1203 finally:
1204 1204 fixture.destroy_repo(repo_name)
1205 1205 fixture.destroy_repo(new_repo_name)
1206 1206
1207 1207 def test_api_update_repo_regular_user_change_repo_name_allowed(self):
1208 1208 repo_name = 'admin_owned'
1209 1209 new_repo_name = 'new_repo_name'
1210 1210 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1211 1211 RepoModel().grant_user_permission(repo=repo_name,
1212 1212 user=self.TEST_USER_LOGIN,
1213 1213 perm='repository.admin')
1214 1214 UserModel().revoke_perm('default', 'hg.create.none')
1215 1215 UserModel().grant_perm('default', 'hg.create.repository')
1216 1216 updates = {'name': new_repo_name}
1217 1217 id_, params = _build_data(self.apikey_regular, 'update_repo',
1218 1218 repoid=repo_name, **updates)
1219 1219 response = api_call(self, params)
1220 1220 try:
1221 1221 expected = {
1222 1222 'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
1223 1223 'repository': repo.get_api_data()
1224 1224 }
1225 1225 self._compare_ok(id_, expected, given=response.body)
1226 1226 finally:
1227 1227 fixture.destroy_repo(repo_name)
1228 1228 fixture.destroy_repo(new_repo_name)
1229 1229
1230 1230 def test_api_update_repo_regular_user_change_owner(self):
1231 1231 repo_name = 'admin_owned'
1232 1232 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1233 1233 RepoModel().grant_user_permission(repo=repo_name,
1234 1234 user=self.TEST_USER_LOGIN,
1235 1235 perm='repository.admin')
1236 1236 updates = {'owner': base.TEST_USER_ADMIN_LOGIN}
1237 1237 id_, params = _build_data(self.apikey_regular, 'update_repo',
1238 1238 repoid=repo_name, **updates)
1239 1239 response = api_call(self, params)
1240 1240 try:
1241 1241 expected = 'Only Kallithea admin can specify `owner` param'
1242 1242 self._compare_error(id_, expected, given=response.body)
1243 1243 finally:
1244 1244 fixture.destroy_repo(repo_name)
1245 1245
1246 1246 def test_api_delete_repo(self):
1247 1247 repo_name = 'api_delete_me'
1248 1248 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1249 1249
1250 1250 id_, params = _build_data(self.apikey, 'delete_repo',
1251 1251 repoid=repo_name, )
1252 1252 response = api_call(self, params)
1253 1253
1254 1254 ret = {
1255 1255 'msg': 'Deleted repository `%s`' % repo_name,
1256 1256 'success': True
1257 1257 }
1258 1258 try:
1259 1259 expected = ret
1260 1260 self._compare_ok(id_, expected, given=response.body)
1261 1261 finally:
1262 1262 fixture.destroy_repo(repo_name)
1263 1263
1264 1264 def test_api_delete_repo_by_non_admin(self):
1265 1265 repo_name = 'api_delete_me'
1266 1266 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
1267 1267 cur_user=self.TEST_USER_LOGIN)
1268 1268 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1269 1269 repoid=repo_name, )
1270 1270 response = api_call(self, params)
1271 1271
1272 1272 ret = {
1273 1273 'msg': 'Deleted repository `%s`' % repo_name,
1274 1274 'success': True
1275 1275 }
1276 1276 try:
1277 1277 expected = ret
1278 1278 self._compare_ok(id_, expected, given=response.body)
1279 1279 finally:
1280 1280 fixture.destroy_repo(repo_name)
1281 1281
1282 1282 def test_api_delete_repo_by_non_admin_no_permission(self):
1283 1283 repo_name = 'api_delete_me'
1284 1284 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1285 1285 try:
1286 1286 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1287 1287 repoid=repo_name, )
1288 1288 response = api_call(self, params)
1289 1289 expected = 'repository `%s` does not exist' % (repo_name)
1290 1290 self._compare_error(id_, expected, given=response.body)
1291 1291 finally:
1292 1292 fixture.destroy_repo(repo_name)
1293 1293
1294 1294 def test_api_delete_repo_exception_occurred(self):
1295 1295 repo_name = 'api_delete_me'
1296 1296 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1297 1297 try:
1298 1298 with mock.patch.object(RepoModel, 'delete', raise_exception):
1299 1299 id_, params = _build_data(self.apikey, 'delete_repo',
1300 1300 repoid=repo_name, )
1301 1301 response = api_call(self, params)
1302 1302
1303 1303 expected = 'failed to delete repository `%s`' % repo_name
1304 1304 self._compare_error(id_, expected, given=response.body)
1305 1305 finally:
1306 1306 fixture.destroy_repo(repo_name)
1307 1307
1308 1308 def test_api_fork_repo(self):
1309 1309 fork_name = 'api-repo-fork'
1310 1310 id_, params = _build_data(self.apikey, 'fork_repo',
1311 1311 repoid=self.REPO,
1312 1312 fork_name=fork_name,
1313 1313 owner=base.TEST_USER_ADMIN_LOGIN,
1314 1314 )
1315 1315 response = api_call(self, params)
1316 1316
1317 1317 ret = {
1318 1318 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1319 1319 fork_name),
1320 1320 'success': True,
1321 1321 }
1322 1322 expected = ret
1323 1323 self._compare_ok(id_, expected, given=response.body)
1324 1324 fixture.destroy_repo(fork_name)
1325 1325
1326 1326 @base.parametrize('fork_name', [
1327 1327 'api-repo-fork',
1328 1328 '%s/api-repo-fork' % TEST_REPO_GROUP,
1329 1329 ])
1330 1330 def test_api_fork_repo_non_admin(self, fork_name):
1331 1331 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1332 1332 self.TEST_USER_LOGIN,
1333 1333 'group.write')
1334 1334 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1335 1335 repoid=self.REPO,
1336 1336 fork_name=fork_name,
1337 1337 )
1338 1338 response = api_call(self, params)
1339 1339
1340 1340 ret = {
1341 1341 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1342 1342 fork_name),
1343 1343 'success': True,
1344 1344 }
1345 1345 expected = ret
1346 1346 self._compare_ok(id_, expected, given=response.body)
1347 1347 fixture.destroy_repo(fork_name)
1348 1348
1349 1349 def test_api_fork_repo_non_admin_specify_owner(self):
1350 1350 fork_name = 'api-repo-fork'
1351 1351 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1352 1352 repoid=self.REPO,
1353 1353 fork_name=fork_name,
1354 1354 owner=base.TEST_USER_ADMIN_LOGIN,
1355 1355 )
1356 1356 response = api_call(self, params)
1357 1357 expected = 'Only Kallithea admin can specify `owner` param'
1358 1358 self._compare_error(id_, expected, given=response.body)
1359 1359 fixture.destroy_repo(fork_name)
1360 1360
1361 1361 def test_api_fork_repo_non_admin_no_permission_to_fork(self):
1362 1362 RepoModel().grant_user_permission(repo=self.REPO,
1363 1363 user=db.User.DEFAULT_USER_NAME,
1364 1364 perm='repository.none')
1365 1365 fork_name = 'api-repo-fork'
1366 1366 try:
1367 1367 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1368 1368 repoid=self.REPO,
1369 1369 fork_name=fork_name,
1370 1370 )
1371 1371 response = api_call(self, params)
1372 1372 expected = 'repository `%s` does not exist' % (self.REPO)
1373 1373 self._compare_error(id_, expected, given=response.body)
1374 1374 finally:
1375 1375 RepoModel().grant_user_permission(repo=self.REPO,
1376 1376 user=db.User.DEFAULT_USER_NAME,
1377 1377 perm='repository.read')
1378 1378 fixture.destroy_repo(fork_name)
1379 1379
1380 1380 @base.parametrize('name,perm', [
1381 1381 ('read', 'repository.read'),
1382 1382 ('write', 'repository.write'),
1383 1383 ('admin', 'repository.admin'),
1384 1384 ])
1385 1385 def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
1386 1386 fork_name = 'api-repo-fork'
1387 1387 # regardless of base repository permission, forking is disallowed
1388 1388 # when repository creation is disabled
1389 1389 RepoModel().grant_user_permission(repo=self.REPO,
1390 1390 user=self.TEST_USER_LOGIN,
1391 1391 perm=perm)
1392 1392 UserModel().revoke_perm('default', 'hg.create.repository')
1393 1393 UserModel().grant_perm('default', 'hg.create.none')
1394 1394 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1395 1395 repoid=self.REPO,
1396 1396 fork_name=fork_name,
1397 1397 )
1398 1398 response = api_call(self, params)
1399 1399 expected = 'no permission to create top level repo'
1400 1400 self._compare_error(id_, expected, given=response.body)
1401 1401 fixture.destroy_repo(fork_name)
1402 1402
1403 1403 def test_api_fork_repo_unknown_owner(self):
1404 1404 fork_name = 'api-repo-fork'
1405 1405 owner = 'i-dont-exist'
1406 1406 id_, params = _build_data(self.apikey, 'fork_repo',
1407 1407 repoid=self.REPO,
1408 1408 fork_name=fork_name,
1409 1409 owner=owner,
1410 1410 )
1411 1411 response = api_call(self, params)
1412 1412 expected = 'user `%s` does not exist' % owner
1413 1413 self._compare_error(id_, expected, given=response.body)
1414 1414
1415 1415 def test_api_fork_repo_fork_exists(self):
1416 1416 fork_name = 'api-repo-fork'
1417 1417 fixture.create_fork(self.REPO, fork_name)
1418 1418
1419 1419 try:
1420 1420 fork_name = 'api-repo-fork'
1421 1421
1422 1422 id_, params = _build_data(self.apikey, 'fork_repo',
1423 1423 repoid=self.REPO,
1424 1424 fork_name=fork_name,
1425 1425 owner=base.TEST_USER_ADMIN_LOGIN,
1426 1426 )
1427 1427 response = api_call(self, params)
1428 1428
1429 1429 expected = "fork `%s` already exist" % fork_name
1430 1430 self._compare_error(id_, expected, given=response.body)
1431 1431 finally:
1432 1432 fixture.destroy_repo(fork_name)
1433 1433
1434 1434 def test_api_fork_repo_repo_exists(self):
1435 1435 fork_name = self.REPO
1436 1436
1437 1437 id_, params = _build_data(self.apikey, 'fork_repo',
1438 1438 repoid=self.REPO,
1439 1439 fork_name=fork_name,
1440 1440 owner=base.TEST_USER_ADMIN_LOGIN,
1441 1441 )
1442 1442 response = api_call(self, params)
1443 1443
1444 1444 expected = "repo `%s` already exist" % fork_name
1445 1445 self._compare_error(id_, expected, given=response.body)
1446 1446
1447 1447 @mock.patch.object(RepoModel, 'create_fork', raise_exception)
1448 1448 def test_api_fork_repo_exception_occurred(self):
1449 1449 fork_name = 'api-repo-fork'
1450 1450 id_, params = _build_data(self.apikey, 'fork_repo',
1451 1451 repoid=self.REPO,
1452 1452 fork_name=fork_name,
1453 1453 owner=base.TEST_USER_ADMIN_LOGIN,
1454 1454 )
1455 1455 response = api_call(self, params)
1456 1456
1457 1457 expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
1458 1458 fork_name)
1459 1459 self._compare_error(id_, expected, given=response.body)
1460 1460
1461 1461 def test_api_get_user_group(self):
1462 1462 id_, params = _build_data(self.apikey, 'get_user_group',
1463 1463 usergroupid=TEST_USER_GROUP)
1464 1464 response = api_call(self, params)
1465 1465
1466 1466 user_group = UserGroupModel().get_group(TEST_USER_GROUP)
1467 1467 members = []
1468 1468 for user in user_group.members:
1469 1469 user = user.user
1470 1470 members.append(user.get_api_data())
1471 1471
1472 1472 ret = user_group.get_api_data()
1473 1473 ret['members'] = members
1474 1474 expected = ret
1475 1475 self._compare_ok(id_, expected, given=response.body)
1476 1476
1477 1477 def test_api_get_user_groups(self):
1478 1478 gr_name = 'test_user_group2'
1479 1479 make_user_group(gr_name)
1480 1480
1481 1481 try:
1482 1482 id_, params = _build_data(self.apikey, 'get_user_groups', )
1483 1483 response = api_call(self, params)
1484 1484
1485 1485 expected = []
1486 1486 for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
1487 1487 user_group = UserGroupModel().get_group(gr_name)
1488 1488 ret = user_group.get_api_data()
1489 1489 expected.append(ret)
1490 1490 self._compare_ok(id_, expected, given=response.body)
1491 1491 finally:
1492 1492 fixture.destroy_user_group(gr_name)
1493 1493
1494 1494 def test_api_create_user_group(self):
1495 1495 group_name = 'some_new_group'
1496 1496 id_, params = _build_data(self.apikey, 'create_user_group',
1497 1497 group_name=group_name)
1498 1498 response = api_call(self, params)
1499 1499
1500 1500 ret = {
1501 1501 'msg': 'created new user group `%s`' % group_name,
1502 1502 'user_group': jsonify(UserGroupModel() \
1503 1503 .get_by_name(group_name) \
1504 1504 .get_api_data())
1505 1505 }
1506 1506 expected = ret
1507 1507 self._compare_ok(id_, expected, given=response.body)
1508 1508
1509 1509 fixture.destroy_user_group(group_name)
1510 1510
1511 1511 def test_api_get_user_group_that_exist(self):
1512 1512 id_, params = _build_data(self.apikey, 'create_user_group',
1513 1513 group_name=TEST_USER_GROUP)
1514 1514 response = api_call(self, params)
1515 1515
1516 1516 expected = "user group `%s` already exist" % TEST_USER_GROUP
1517 1517 self._compare_error(id_, expected, given=response.body)
1518 1518
1519 1519 @mock.patch.object(UserGroupModel, 'create', raise_exception)
1520 1520 def test_api_get_user_group_exception_occurred(self):
1521 1521 group_name = 'exception_happens'
1522 1522 id_, params = _build_data(self.apikey, 'create_user_group',
1523 1523 group_name=group_name)
1524 1524 response = api_call(self, params)
1525 1525
1526 1526 expected = 'failed to create group `%s`' % group_name
1527 1527 self._compare_error(id_, expected, given=response.body)
1528 1528
1529 1529 @base.parametrize('changing_attr,updates', [
1530 1530 ('group_name', {'group_name': 'new_group_name'}),
1531 1531 ('group_name', {'group_name': 'test_group_for_update'}),
1532 1532 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1533 1533 ('active', {'active': False}),
1534 1534 ('active', {'active': True}),
1535 1535 ])
1536 1536 def test_api_update_user_group(self, changing_attr, updates):
1537 1537 gr_name = 'test_group_for_update'
1538 1538 user_group = fixture.create_user_group(gr_name)
1539 1539 try:
1540 1540 id_, params = _build_data(self.apikey, 'update_user_group',
1541 1541 usergroupid=gr_name, **updates)
1542 1542 response = api_call(self, params)
1543 1543 expected = {
1544 1544 'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
1545 1545 user_group.users_group_name),
1546 1546 'user_group': user_group.get_api_data()
1547 1547 }
1548 1548 self._compare_ok(id_, expected, given=response.body)
1549 1549 finally:
1550 1550 if changing_attr == 'group_name':
1551 1551 # switch to updated name for proper cleanup
1552 1552 gr_name = updates['group_name']
1553 1553 fixture.destroy_user_group(gr_name)
1554 1554
1555 1555 @mock.patch.object(UserGroupModel, 'update', raise_exception)
1556 1556 def test_api_update_user_group_exception_occurred(self):
1557 1557 gr_name = 'test_group'
1558 1558 fixture.create_user_group(gr_name)
1559 1559 try:
1560 1560 id_, params = _build_data(self.apikey, 'update_user_group',
1561 1561 usergroupid=gr_name)
1562 1562 response = api_call(self, params)
1563 1563 expected = 'failed to update user group `%s`' % gr_name
1564 1564 self._compare_error(id_, expected, given=response.body)
1565 1565 finally:
1566 1566 fixture.destroy_user_group(gr_name)
1567 1567
1568 1568 def test_api_add_user_to_user_group(self):
1569 1569 gr_name = 'test_group'
1570 1570 fixture.create_user_group(gr_name)
1571 1571 try:
1572 1572 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1573 1573 usergroupid=gr_name,
1574 1574 userid=base.TEST_USER_ADMIN_LOGIN)
1575 1575 response = api_call(self, params)
1576 1576 expected = {
1577 1577 'msg': 'added member `%s` to user group `%s`' % (
1578 1578 base.TEST_USER_ADMIN_LOGIN, gr_name),
1579 1579 'success': True
1580 1580 }
1581 1581 self._compare_ok(id_, expected, given=response.body)
1582 1582 finally:
1583 1583 fixture.destroy_user_group(gr_name)
1584 1584
1585 1585 def test_api_add_user_to_user_group_that_doesnt_exist(self):
1586 1586 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1587 1587 usergroupid='false-group',
1588 1588 userid=base.TEST_USER_ADMIN_LOGIN)
1589 1589 response = api_call(self, params)
1590 1590
1591 1591 expected = 'user group `%s` does not exist' % 'false-group'
1592 1592 self._compare_error(id_, expected, given=response.body)
1593 1593
1594 1594 @mock.patch.object(UserGroupModel, 'add_user_to_group', raise_exception)
1595 1595 def test_api_add_user_to_user_group_exception_occurred(self):
1596 1596 gr_name = 'test_group'
1597 1597 fixture.create_user_group(gr_name)
1598 1598 try:
1599 1599 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1600 1600 usergroupid=gr_name,
1601 1601 userid=base.TEST_USER_ADMIN_LOGIN)
1602 1602 response = api_call(self, params)
1603 1603 expected = 'failed to add member to user group `%s`' % gr_name
1604 1604 self._compare_error(id_, expected, given=response.body)
1605 1605 finally:
1606 1606 fixture.destroy_user_group(gr_name)
1607 1607
1608 1608 def test_api_remove_user_from_user_group(self):
1609 1609 gr_name = 'test_group_3'
1610 1610 gr = fixture.create_user_group(gr_name)
1611 1611 UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN)
1612 1612 meta.Session().commit()
1613 1613 try:
1614 1614 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1615 1615 usergroupid=gr_name,
1616 1616 userid=base.TEST_USER_ADMIN_LOGIN)
1617 1617 response = api_call(self, params)
1618 1618 expected = {
1619 1619 'msg': 'removed member `%s` from user group `%s`' % (
1620 1620 base.TEST_USER_ADMIN_LOGIN, gr_name
1621 1621 ),
1622 1622 'success': True}
1623 1623 self._compare_ok(id_, expected, given=response.body)
1624 1624 finally:
1625 1625 fixture.destroy_user_group(gr_name)
1626 1626
1627 1627 @mock.patch.object(UserGroupModel, 'remove_user_from_group', raise_exception)
1628 1628 def test_api_remove_user_from_user_group_exception_occurred(self):
1629 1629 gr_name = 'test_group_3'
1630 1630 gr = fixture.create_user_group(gr_name)
1631 1631 UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN)
1632 1632 meta.Session().commit()
1633 1633 try:
1634 1634 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1635 1635 usergroupid=gr_name,
1636 1636 userid=base.TEST_USER_ADMIN_LOGIN)
1637 1637 response = api_call(self, params)
1638 1638 expected = 'failed to remove member from user group `%s`' % gr_name
1639 1639 self._compare_error(id_, expected, given=response.body)
1640 1640 finally:
1641 1641 fixture.destroy_user_group(gr_name)
1642 1642
1643 1643 def test_api_delete_user_group(self):
1644 1644 gr_name = 'test_group'
1645 1645 ugroup = fixture.create_user_group(gr_name)
1646 1646 gr_id = ugroup.users_group_id
1647 1647 try:
1648 1648 id_, params = _build_data(self.apikey, 'delete_user_group',
1649 1649 usergroupid=gr_name)
1650 1650 response = api_call(self, params)
1651 1651 expected = {
1652 1652 'user_group': None,
1653 1653 'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
1654 1654 }
1655 1655 self._compare_ok(id_, expected, given=response.body)
1656 1656 finally:
1657 1657 if UserGroupModel().get_by_name(gr_name):
1658 1658 fixture.destroy_user_group(gr_name)
1659 1659
1660 1660 def test_api_delete_user_group_that_is_assigned(self):
1661 1661 gr_name = 'test_group'
1662 1662 ugroup = fixture.create_user_group(gr_name)
1663 1663 gr_id = ugroup.users_group_id
1664 1664
1665 1665 ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
1666 1666 msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
1667 1667
1668 1668 try:
1669 1669 id_, params = _build_data(self.apikey, 'delete_user_group',
1670 1670 usergroupid=gr_name)
1671 1671 response = api_call(self, params)
1672 1672 expected = msg
1673 1673 self._compare_error(id_, expected, given=response.body)
1674 1674 finally:
1675 1675 if UserGroupModel().get_by_name(gr_name):
1676 1676 fixture.destroy_user_group(gr_name)
1677 1677
1678 1678 def test_api_delete_user_group_exception_occurred(self):
1679 1679 gr_name = 'test_group'
1680 1680 ugroup = fixture.create_user_group(gr_name)
1681 1681 gr_id = ugroup.users_group_id
1682 1682 id_, params = _build_data(self.apikey, 'delete_user_group',
1683 1683 usergroupid=gr_name)
1684 1684
1685 1685 try:
1686 1686 with mock.patch.object(UserGroupModel, 'delete', raise_exception):
1687 1687 response = api_call(self, params)
1688 1688 expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
1689 1689 self._compare_error(id_, expected, given=response.body)
1690 1690 finally:
1691 1691 fixture.destroy_user_group(gr_name)
1692 1692
1693 1693 @base.parametrize('name,perm', [
1694 1694 ('none', 'repository.none'),
1695 1695 ('read', 'repository.read'),
1696 1696 ('write', 'repository.write'),
1697 1697 ('admin', 'repository.admin'),
1698 1698 ])
1699 1699 def test_api_grant_user_permission(self, name, perm):
1700 1700 id_, params = _build_data(self.apikey,
1701 1701 'grant_user_permission',
1702 1702 repoid=self.REPO,
1703 1703 userid=base.TEST_USER_ADMIN_LOGIN,
1704 1704 perm=perm)
1705 1705 response = api_call(self, params)
1706 1706
1707 1707 ret = {
1708 1708 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
1709 1709 perm, base.TEST_USER_ADMIN_LOGIN, self.REPO
1710 1710 ),
1711 1711 'success': True
1712 1712 }
1713 1713 expected = ret
1714 1714 self._compare_ok(id_, expected, given=response.body)
1715 1715
1716 1716 def test_api_grant_user_permission_wrong_permission(self):
1717 1717 perm = 'haha.no.permission'
1718 1718 id_, params = _build_data(self.apikey,
1719 1719 'grant_user_permission',
1720 1720 repoid=self.REPO,
1721 1721 userid=base.TEST_USER_ADMIN_LOGIN,
1722 1722 perm=perm)
1723 1723 response = api_call(self, params)
1724 1724
1725 1725 expected = 'permission `%s` does not exist' % perm
1726 1726 self._compare_error(id_, expected, given=response.body)
1727 1727
1728 1728 @mock.patch.object(RepoModel, 'grant_user_permission', raise_exception)
1729 1729 def test_api_grant_user_permission_exception_when_adding(self):
1730 1730 perm = 'repository.read'
1731 1731 id_, params = _build_data(self.apikey,
1732 1732 'grant_user_permission',
1733 1733 repoid=self.REPO,
1734 1734 userid=base.TEST_USER_ADMIN_LOGIN,
1735 1735 perm=perm)
1736 1736 response = api_call(self, params)
1737 1737
1738 1738 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1739 1739 base.TEST_USER_ADMIN_LOGIN, self.REPO
1740 1740 )
1741 1741 self._compare_error(id_, expected, given=response.body)
1742 1742
1743 1743 def test_api_revoke_user_permission(self):
1744 1744 id_, params = _build_data(self.apikey,
1745 1745 'revoke_user_permission',
1746 1746 repoid=self.REPO,
1747 1747 userid=base.TEST_USER_ADMIN_LOGIN, )
1748 1748 response = api_call(self, params)
1749 1749
1750 1750 expected = {
1751 1751 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
1752 1752 base.TEST_USER_ADMIN_LOGIN, self.REPO
1753 1753 ),
1754 1754 'success': True
1755 1755 }
1756 1756 self._compare_ok(id_, expected, given=response.body)
1757 1757
1758 1758 @mock.patch.object(RepoModel, 'revoke_user_permission', raise_exception)
1759 1759 def test_api_revoke_user_permission_exception_when_adding(self):
1760 1760 id_, params = _build_data(self.apikey,
1761 1761 'revoke_user_permission',
1762 1762 repoid=self.REPO,
1763 1763 userid=base.TEST_USER_ADMIN_LOGIN, )
1764 1764 response = api_call(self, params)
1765 1765
1766 1766 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1767 1767 base.TEST_USER_ADMIN_LOGIN, self.REPO
1768 1768 )
1769 1769 self._compare_error(id_, expected, given=response.body)
1770 1770
1771 1771 @base.parametrize('name,perm', [
1772 1772 ('none', 'repository.none'),
1773 1773 ('read', 'repository.read'),
1774 1774 ('write', 'repository.write'),
1775 1775 ('admin', 'repository.admin'),
1776 1776 ])
1777 1777 def test_api_grant_user_group_permission(self, name, perm):
1778 1778 id_, params = _build_data(self.apikey,
1779 1779 'grant_user_group_permission',
1780 1780 repoid=self.REPO,
1781 1781 usergroupid=TEST_USER_GROUP,
1782 1782 perm=perm)
1783 1783 response = api_call(self, params)
1784 1784
1785 1785 ret = {
1786 1786 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
1787 1787 perm, TEST_USER_GROUP, self.REPO
1788 1788 ),
1789 1789 'success': True
1790 1790 }
1791 1791 expected = ret
1792 1792 self._compare_ok(id_, expected, given=response.body)
1793 1793
1794 1794 def test_api_grant_user_group_permission_wrong_permission(self):
1795 1795 perm = 'haha.no.permission'
1796 1796 id_, params = _build_data(self.apikey,
1797 1797 'grant_user_group_permission',
1798 1798 repoid=self.REPO,
1799 1799 usergroupid=TEST_USER_GROUP,
1800 1800 perm=perm)
1801 1801 response = api_call(self, params)
1802 1802
1803 1803 expected = 'permission `%s` does not exist' % perm
1804 1804 self._compare_error(id_, expected, given=response.body)
1805 1805
1806 1806 @mock.patch.object(RepoModel, 'grant_user_group_permission', raise_exception)
1807 1807 def test_api_grant_user_group_permission_exception_when_adding(self):
1808 1808 perm = 'repository.read'
1809 1809 id_, params = _build_data(self.apikey,
1810 1810 'grant_user_group_permission',
1811 1811 repoid=self.REPO,
1812 1812 usergroupid=TEST_USER_GROUP,
1813 1813 perm=perm)
1814 1814 response = api_call(self, params)
1815 1815
1816 1816 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1817 1817 TEST_USER_GROUP, self.REPO
1818 1818 )
1819 1819 self._compare_error(id_, expected, given=response.body)
1820 1820
1821 1821 def test_api_revoke_user_group_permission(self):
1822 1822 RepoModel().grant_user_group_permission(repo=self.REPO,
1823 1823 group_name=TEST_USER_GROUP,
1824 1824 perm='repository.read')
1825 1825 meta.Session().commit()
1826 1826 id_, params = _build_data(self.apikey,
1827 1827 'revoke_user_group_permission',
1828 1828 repoid=self.REPO,
1829 1829 usergroupid=TEST_USER_GROUP, )
1830 1830 response = api_call(self, params)
1831 1831
1832 1832 expected = {
1833 1833 'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
1834 1834 TEST_USER_GROUP, self.REPO
1835 1835 ),
1836 1836 'success': True
1837 1837 }
1838 1838 self._compare_ok(id_, expected, given=response.body)
1839 1839
1840 1840 @mock.patch.object(RepoModel, 'revoke_user_group_permission', raise_exception)
1841 1841 def test_api_revoke_user_group_permission_exception_when_adding(self):
1842 1842 id_, params = _build_data(self.apikey,
1843 1843 'revoke_user_group_permission',
1844 1844 repoid=self.REPO,
1845 1845 usergroupid=TEST_USER_GROUP, )
1846 1846 response = api_call(self, params)
1847 1847
1848 1848 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1849 1849 TEST_USER_GROUP, self.REPO
1850 1850 )
1851 1851 self._compare_error(id_, expected, given=response.body)
1852 1852
1853 1853 @base.parametrize('changing_attr,updates', [
1854 #('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}), # currently broken
1854 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1855 1855 ('description', {'description': 'new description'}),
1856 1856 ('group_name', {'group_name': 'new_repo_name'}),
1857 1857 ('parent', {'parent': 'test_group_for_update'}),
1858 1858 ])
1859 1859 def test_api_update_repo_group(self, changing_attr, updates):
1860 1860 group_name = 'lololo'
1861 1861 repo_group = fixture.create_repo_group(group_name)
1862 1862
1863 1863 new_group_name = group_name
1864 1864 if changing_attr == 'group_name':
1865 1865 assert repo_group.parent_group_id is None # lazy assumption for this test
1866 1866 new_group_name = updates['group_name']
1867 1867 if changing_attr == 'parent':
1868 1868 new_group_name = '/'.join([updates['parent'], group_name.rsplit('/', 1)[-1]])
1869 1869
1870 1870 expected = {
1871 1871 'msg': 'updated repository group ID:%s %s' % (repo_group.group_id, new_group_name),
1872 1872 'repo_group': repo_group.get_api_data()
1873 1873 }
1874 1874 expected['repo_group'].update(updates)
1875 1875 if 'description' in updates:
1876 1876 expected['repo_group']['group_description'] = expected['repo_group'].pop('description')
1877 1877
1878 1878 if changing_attr == 'parent':
1879 1879 new_parent = fixture.create_repo_group(updates['parent'])
1880 1880 expected['repo_group']['parent_group'] = expected['repo_group'].pop('parent')
1881 1881 expected['repo_group']['group_name'] = new_group_name
1882 1882
1883 1883 id_, params = _build_data(self.apikey, 'update_repo_group',
1884 1884 repogroupid=group_name, **updates)
1885 1885 response = api_call(self, params)
1886 1886
1887 1887 try:
1888 1888 self._compare_ok(id_, expected, given=response.body)
1889 1889 finally:
1890 1890 if changing_attr == 'parent':
1891 1891 fixture.destroy_repo_group(new_parent.group_id)
1892 1892 fixture.destroy_repo_group(new_group_name)
1893 1893
1894 1894 @base.parametrize('name,perm,apply_to_children', [
1895 1895 ('none', 'group.none', 'none'),
1896 1896 ('read', 'group.read', 'none'),
1897 1897 ('write', 'group.write', 'none'),
1898 1898 ('admin', 'group.admin', 'none'),
1899 1899
1900 1900 ('none', 'group.none', 'all'),
1901 1901 ('read', 'group.read', 'all'),
1902 1902 ('write', 'group.write', 'all'),
1903 1903 ('admin', 'group.admin', 'all'),
1904 1904
1905 1905 ('none', 'group.none', 'repos'),
1906 1906 ('read', 'group.read', 'repos'),
1907 1907 ('write', 'group.write', 'repos'),
1908 1908 ('admin', 'group.admin', 'repos'),
1909 1909
1910 1910 ('none', 'group.none', 'groups'),
1911 1911 ('read', 'group.read', 'groups'),
1912 1912 ('write', 'group.write', 'groups'),
1913 1913 ('admin', 'group.admin', 'groups'),
1914 1914 ])
1915 1915 def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children):
1916 1916 id_, params = _build_data(self.apikey,
1917 1917 'grant_user_permission_to_repo_group',
1918 1918 repogroupid=TEST_REPO_GROUP,
1919 1919 userid=base.TEST_USER_ADMIN_LOGIN,
1920 1920 perm=perm, apply_to_children=apply_to_children)
1921 1921 response = api_call(self, params)
1922 1922
1923 1923 ret = {
1924 1924 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1925 1925 perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1926 1926 ),
1927 1927 'success': True
1928 1928 }
1929 1929 expected = ret
1930 1930 self._compare_ok(id_, expected, given=response.body)
1931 1931
1932 1932 @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
1933 1933 ('none_fails', 'group.none', 'none', False, False),
1934 1934 ('read_fails', 'group.read', 'none', False, False),
1935 1935 ('write_fails', 'group.write', 'none', False, False),
1936 1936 ('admin_fails', 'group.admin', 'none', False, False),
1937 1937
1938 1938 # with granted perms
1939 1939 ('none_ok', 'group.none', 'none', True, True),
1940 1940 ('read_ok', 'group.read', 'none', True, True),
1941 1941 ('write_ok', 'group.write', 'none', True, True),
1942 1942 ('admin_ok', 'group.admin', 'none', True, True),
1943 1943 ])
1944 1944 def test_api_grant_user_permission_to_repo_group_by_regular_user(
1945 1945 self, name, perm, apply_to_children, grant_admin, access_ok):
1946 1946 if grant_admin:
1947 1947 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1948 1948 self.TEST_USER_LOGIN,
1949 1949 'group.admin')
1950 1950 meta.Session().commit()
1951 1951
1952 1952 id_, params = _build_data(self.apikey_regular,
1953 1953 'grant_user_permission_to_repo_group',
1954 1954 repogroupid=TEST_REPO_GROUP,
1955 1955 userid=base.TEST_USER_ADMIN_LOGIN,
1956 1956 perm=perm, apply_to_children=apply_to_children)
1957 1957 response = api_call(self, params)
1958 1958 if access_ok:
1959 1959 ret = {
1960 1960 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1961 1961 perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1962 1962 ),
1963 1963 'success': True
1964 1964 }
1965 1965 expected = ret
1966 1966 self._compare_ok(id_, expected, given=response.body)
1967 1967 else:
1968 1968 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
1969 1969 self._compare_error(id_, expected, given=response.body)
1970 1970
1971 1971 def test_api_grant_user_permission_to_repo_group_wrong_permission(self):
1972 1972 perm = 'haha.no.permission'
1973 1973 id_, params = _build_data(self.apikey,
1974 1974 'grant_user_permission_to_repo_group',
1975 1975 repogroupid=TEST_REPO_GROUP,
1976 1976 userid=base.TEST_USER_ADMIN_LOGIN,
1977 1977 perm=perm)
1978 1978 response = api_call(self, params)
1979 1979
1980 1980 expected = 'permission `%s` does not exist' % perm
1981 1981 self._compare_error(id_, expected, given=response.body)
1982 1982
1983 1983 @mock.patch.object(RepoGroupModel, 'grant_user_permission', raise_exception)
1984 1984 def test_api_grant_user_permission_to_repo_group_exception_when_adding(self):
1985 1985 perm = 'group.read'
1986 1986 id_, params = _build_data(self.apikey,
1987 1987 'grant_user_permission_to_repo_group',
1988 1988 repogroupid=TEST_REPO_GROUP,
1989 1989 userid=base.TEST_USER_ADMIN_LOGIN,
1990 1990 perm=perm)
1991 1991 response = api_call(self, params)
1992 1992
1993 1993 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1994 1994 base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1995 1995 )
1996 1996 self._compare_error(id_, expected, given=response.body)
1997 1997
1998 1998 @base.parametrize('name,apply_to_children', [
1999 1999 ('none', 'none'),
2000 2000 ('all', 'all'),
2001 2001 ('repos', 'repos'),
2002 2002 ('groups', 'groups'),
2003 2003 ])
2004 2004 def test_api_revoke_user_permission_from_repo_group(self, name, apply_to_children):
2005 2005 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2006 2006 user=base.TEST_USER_ADMIN_LOGIN,
2007 2007 perm='group.read',)
2008 2008 meta.Session().commit()
2009 2009
2010 2010 id_, params = _build_data(self.apikey,
2011 2011 'revoke_user_permission_from_repo_group',
2012 2012 repogroupid=TEST_REPO_GROUP,
2013 2013 userid=base.TEST_USER_ADMIN_LOGIN,
2014 2014 apply_to_children=apply_to_children,)
2015 2015 response = api_call(self, params)
2016 2016
2017 2017 expected = {
2018 2018 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
2019 2019 apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2020 2020 ),
2021 2021 'success': True
2022 2022 }
2023 2023 self._compare_ok(id_, expected, given=response.body)
2024 2024
2025 2025 @base.parametrize('name,apply_to_children,grant_admin,access_ok', [
2026 2026 ('none', 'none', False, False),
2027 2027 ('all', 'all', False, False),
2028 2028 ('repos', 'repos', False, False),
2029 2029 ('groups', 'groups', False, False),
2030 2030
2031 2031 # after granting admin rights
2032 2032 ('none', 'none', False, False),
2033 2033 ('all', 'all', False, False),
2034 2034 ('repos', 'repos', False, False),
2035 2035 ('groups', 'groups', False, False),
2036 2036 ])
2037 2037 def test_api_revoke_user_permission_from_repo_group_by_regular_user(
2038 2038 self, name, apply_to_children, grant_admin, access_ok):
2039 2039 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2040 2040 user=base.TEST_USER_ADMIN_LOGIN,
2041 2041 perm='group.read',)
2042 2042 meta.Session().commit()
2043 2043
2044 2044 if grant_admin:
2045 2045 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2046 2046 self.TEST_USER_LOGIN,
2047 2047 'group.admin')
2048 2048 meta.Session().commit()
2049 2049
2050 2050 id_, params = _build_data(self.apikey_regular,
2051 2051 'revoke_user_permission_from_repo_group',
2052 2052 repogroupid=TEST_REPO_GROUP,
2053 2053 userid=base.TEST_USER_ADMIN_LOGIN,
2054 2054 apply_to_children=apply_to_children,)
2055 2055 response = api_call(self, params)
2056 2056 if access_ok:
2057 2057 expected = {
2058 2058 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
2059 2059 apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2060 2060 ),
2061 2061 'success': True
2062 2062 }
2063 2063 self._compare_ok(id_, expected, given=response.body)
2064 2064 else:
2065 2065 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2066 2066 self._compare_error(id_, expected, given=response.body)
2067 2067
2068 2068 @mock.patch.object(RepoGroupModel, 'revoke_user_permission', raise_exception)
2069 2069 def test_api_revoke_user_permission_from_repo_group_exception_when_adding(self):
2070 2070 id_, params = _build_data(self.apikey,
2071 2071 'revoke_user_permission_from_repo_group',
2072 2072 repogroupid=TEST_REPO_GROUP,
2073 2073 userid=base.TEST_USER_ADMIN_LOGIN, )
2074 2074 response = api_call(self, params)
2075 2075
2076 2076 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
2077 2077 base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2078 2078 )
2079 2079 self._compare_error(id_, expected, given=response.body)
2080 2080
2081 2081 @base.parametrize('name,perm,apply_to_children', [
2082 2082 ('none', 'group.none', 'none'),
2083 2083 ('read', 'group.read', 'none'),
2084 2084 ('write', 'group.write', 'none'),
2085 2085 ('admin', 'group.admin', 'none'),
2086 2086
2087 2087 ('none', 'group.none', 'all'),
2088 2088 ('read', 'group.read', 'all'),
2089 2089 ('write', 'group.write', 'all'),
2090 2090 ('admin', 'group.admin', 'all'),
2091 2091
2092 2092 ('none', 'group.none', 'repos'),
2093 2093 ('read', 'group.read', 'repos'),
2094 2094 ('write', 'group.write', 'repos'),
2095 2095 ('admin', 'group.admin', 'repos'),
2096 2096
2097 2097 ('none', 'group.none', 'groups'),
2098 2098 ('read', 'group.read', 'groups'),
2099 2099 ('write', 'group.write', 'groups'),
2100 2100 ('admin', 'group.admin', 'groups'),
2101 2101 ])
2102 2102 def test_api_grant_user_group_permission_to_repo_group(self, name, perm, apply_to_children):
2103 2103 id_, params = _build_data(self.apikey,
2104 2104 'grant_user_group_permission_to_repo_group',
2105 2105 repogroupid=TEST_REPO_GROUP,
2106 2106 usergroupid=TEST_USER_GROUP,
2107 2107 perm=perm,
2108 2108 apply_to_children=apply_to_children,)
2109 2109 response = api_call(self, params)
2110 2110
2111 2111 ret = {
2112 2112 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2113 2113 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2114 2114 ),
2115 2115 'success': True
2116 2116 }
2117 2117 expected = ret
2118 2118 self._compare_ok(id_, expected, given=response.body)
2119 2119
2120 2120 @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
2121 2121 ('none_fails', 'group.none', 'none', False, False),
2122 2122 ('read_fails', 'group.read', 'none', False, False),
2123 2123 ('write_fails', 'group.write', 'none', False, False),
2124 2124 ('admin_fails', 'group.admin', 'none', False, False),
2125 2125
2126 2126 # with granted perms
2127 2127 ('none_ok', 'group.none', 'none', True, True),
2128 2128 ('read_ok', 'group.read', 'none', True, True),
2129 2129 ('write_ok', 'group.write', 'none', True, True),
2130 2130 ('admin_ok', 'group.admin', 'none', True, True),
2131 2131 ])
2132 2132 def test_api_grant_user_group_permission_to_repo_group_by_regular_user(
2133 2133 self, name, perm, apply_to_children, grant_admin, access_ok):
2134 2134 if grant_admin:
2135 2135 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2136 2136 self.TEST_USER_LOGIN,
2137 2137 'group.admin')
2138 2138 meta.Session().commit()
2139 2139
2140 2140 id_, params = _build_data(self.apikey_regular,
2141 2141 'grant_user_group_permission_to_repo_group',
2142 2142 repogroupid=TEST_REPO_GROUP,
2143 2143 usergroupid=TEST_USER_GROUP,
2144 2144 perm=perm,
2145 2145 apply_to_children=apply_to_children,)
2146 2146 response = api_call(self, params)
2147 2147 if access_ok:
2148 2148 ret = {
2149 2149 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2150 2150 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2151 2151 ),
2152 2152 'success': True
2153 2153 }
2154 2154 expected = ret
2155 2155 self._compare_ok(id_, expected, given=response.body)
2156 2156 else:
2157 2157 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2158 2158 self._compare_error(id_, expected, given=response.body)
2159 2159
2160 2160 def test_api_grant_user_group_permission_to_repo_group_wrong_permission(self):
2161 2161 perm = 'haha.no.permission'
2162 2162 id_, params = _build_data(self.apikey,
2163 2163 'grant_user_group_permission_to_repo_group',
2164 2164 repogroupid=TEST_REPO_GROUP,
2165 2165 usergroupid=TEST_USER_GROUP,
2166 2166 perm=perm)
2167 2167 response = api_call(self, params)
2168 2168
2169 2169 expected = 'permission `%s` does not exist' % perm
2170 2170 self._compare_error(id_, expected, given=response.body)
2171 2171
2172 2172 @mock.patch.object(RepoGroupModel, 'grant_user_group_permission', raise_exception)
2173 2173 def test_api_grant_user_group_permission_exception_when_adding_to_repo_group(self):
2174 2174 perm = 'group.read'
2175 2175 id_, params = _build_data(self.apikey,
2176 2176 'grant_user_group_permission_to_repo_group',
2177 2177 repogroupid=TEST_REPO_GROUP,
2178 2178 usergroupid=TEST_USER_GROUP,
2179 2179 perm=perm)
2180 2180 response = api_call(self, params)
2181 2181
2182 2182 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2183 2183 TEST_USER_GROUP, TEST_REPO_GROUP
2184 2184 )
2185 2185 self._compare_error(id_, expected, given=response.body)
2186 2186
2187 2187 @base.parametrize('name,apply_to_children', [
2188 2188 ('none', 'none'),
2189 2189 ('all', 'all'),
2190 2190 ('repos', 'repos'),
2191 2191 ('groups', 'groups'),
2192 2192 ])
2193 2193 def test_api_revoke_user_group_permission_from_repo_group(self, name, apply_to_children):
2194 2194 RepoGroupModel().grant_user_group_permission(repo_group=TEST_REPO_GROUP,
2195 2195 group_name=TEST_USER_GROUP,
2196 2196 perm='group.read',)
2197 2197 meta.Session().commit()
2198 2198 id_, params = _build_data(self.apikey,
2199 2199 'revoke_user_group_permission_from_repo_group',
2200 2200 repogroupid=TEST_REPO_GROUP,
2201 2201 usergroupid=TEST_USER_GROUP,
2202 2202 apply_to_children=apply_to_children,)
2203 2203 response = api_call(self, params)
2204 2204
2205 2205 expected = {
2206 2206 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2207 2207 apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2208 2208 ),
2209 2209 'success': True
2210 2210 }
2211 2211 self._compare_ok(id_, expected, given=response.body)
2212 2212
2213 2213 @base.parametrize('name,apply_to_children,grant_admin,access_ok', [
2214 2214 ('none', 'none', False, False),
2215 2215 ('all', 'all', False, False),
2216 2216 ('repos', 'repos', False, False),
2217 2217 ('groups', 'groups', False, False),
2218 2218
2219 2219 # after granting admin rights
2220 2220 ('none', 'none', False, False),
2221 2221 ('all', 'all', False, False),
2222 2222 ('repos', 'repos', False, False),
2223 2223 ('groups', 'groups', False, False),
2224 2224 ])
2225 2225 def test_api_revoke_user_group_permission_from_repo_group_by_regular_user(
2226 2226 self, name, apply_to_children, grant_admin, access_ok):
2227 2227 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2228 2228 user=base.TEST_USER_ADMIN_LOGIN,
2229 2229 perm='group.read',)
2230 2230 meta.Session().commit()
2231 2231
2232 2232 if grant_admin:
2233 2233 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2234 2234 self.TEST_USER_LOGIN,
2235 2235 'group.admin')
2236 2236 meta.Session().commit()
2237 2237
2238 2238 id_, params = _build_data(self.apikey_regular,
2239 2239 'revoke_user_group_permission_from_repo_group',
2240 2240 repogroupid=TEST_REPO_GROUP,
2241 2241 usergroupid=TEST_USER_GROUP,
2242 2242 apply_to_children=apply_to_children,)
2243 2243 response = api_call(self, params)
2244 2244 if access_ok:
2245 2245 expected = {
2246 2246 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2247 2247 apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2248 2248 ),
2249 2249 'success': True
2250 2250 }
2251 2251 self._compare_ok(id_, expected, given=response.body)
2252 2252 else:
2253 2253 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2254 2254 self._compare_error(id_, expected, given=response.body)
2255 2255
2256 2256 @mock.patch.object(RepoGroupModel, 'revoke_user_group_permission', raise_exception)
2257 2257 def test_api_revoke_user_group_permission_from_repo_group_exception_when_adding(self):
2258 2258 id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group',
2259 2259 repogroupid=TEST_REPO_GROUP,
2260 2260 usergroupid=TEST_USER_GROUP,)
2261 2261 response = api_call(self, params)
2262 2262
2263 2263 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2264 2264 TEST_USER_GROUP, TEST_REPO_GROUP
2265 2265 )
2266 2266 self._compare_error(id_, expected, given=response.body)
2267 2267
2268 2268 def test_api_get_gist(self):
2269 2269 gist = fixture.create_gist()
2270 2270 gist_id = gist.gist_access_id
2271 2271 gist_created_on = gist.created_on
2272 2272 id_, params = _build_data(self.apikey, 'get_gist',
2273 2273 gistid=gist_id, )
2274 2274 response = api_call(self, params)
2275 2275
2276 2276 expected = {
2277 2277 'access_id': gist_id,
2278 2278 'created_on': gist_created_on,
2279 2279 'description': 'new-gist',
2280 2280 'expires': -1.0,
2281 2281 'gist_id': int(gist_id),
2282 2282 'type': 'public',
2283 2283 'url': 'http://localhost:80/_admin/gists/%s' % gist_id
2284 2284 }
2285 2285
2286 2286 self._compare_ok(id_, expected, given=response.body)
2287 2287
2288 2288 def test_api_get_gist_that_does_not_exist(self):
2289 2289 id_, params = _build_data(self.apikey_regular, 'get_gist',
2290 2290 gistid='12345', )
2291 2291 response = api_call(self, params)
2292 2292 expected = 'gist `%s` does not exist' % ('12345',)
2293 2293 self._compare_error(id_, expected, given=response.body)
2294 2294
2295 2295 def test_api_get_gist_private_gist_without_permission(self):
2296 2296 gist = fixture.create_gist()
2297 2297 gist_id = gist.gist_access_id
2298 2298 gist_created_on = gist.created_on
2299 2299 id_, params = _build_data(self.apikey_regular, 'get_gist',
2300 2300 gistid=gist_id, )
2301 2301 response = api_call(self, params)
2302 2302
2303 2303 expected = 'gist `%s` does not exist' % gist_id
2304 2304 self._compare_error(id_, expected, given=response.body)
2305 2305
2306 2306 def test_api_get_gists(self):
2307 2307 fixture.create_gist()
2308 2308 fixture.create_gist()
2309 2309
2310 2310 id_, params = _build_data(self.apikey, 'get_gists')
2311 2311 response = api_call(self, params)
2312 2312 expected = response.json
2313 2313 assert len(response.json['result']) == 2
2314 2314 #self._compare_ok(id_, expected, given=response.body)
2315 2315
2316 2316 def test_api_get_gists_regular_user(self):
2317 2317 # by admin
2318 2318 fixture.create_gist()
2319 2319 fixture.create_gist()
2320 2320
2321 2321 # by reg user
2322 2322 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2323 2323 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2324 2324 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2325 2325
2326 2326 id_, params = _build_data(self.apikey_regular, 'get_gists')
2327 2327 response = api_call(self, params)
2328 2328 expected = response.json
2329 2329 assert len(response.json['result']) == 3
2330 2330 #self._compare_ok(id_, expected, given=response.body)
2331 2331
2332 2332 def test_api_get_gists_only_for_regular_user(self):
2333 2333 # by admin
2334 2334 fixture.create_gist()
2335 2335 fixture.create_gist()
2336 2336
2337 2337 # by reg user
2338 2338 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2339 2339 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2340 2340 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2341 2341
2342 2342 id_, params = _build_data(self.apikey, 'get_gists',
2343 2343 userid=self.TEST_USER_LOGIN)
2344 2344 response = api_call(self, params)
2345 2345 expected = response.json
2346 2346 assert len(response.json['result']) == 3
2347 2347 #self._compare_ok(id_, expected, given=response.body)
2348 2348
2349 2349 def test_api_get_gists_regular_user_with_different_userid(self):
2350 2350 id_, params = _build_data(self.apikey_regular, 'get_gists',
2351 2351 userid=base.TEST_USER_ADMIN_LOGIN)
2352 2352 response = api_call(self, params)
2353 2353 expected = 'userid is not the same as your user'
2354 2354 self._compare_error(id_, expected, given=response.body)
2355 2355
2356 2356 def test_api_create_gist(self):
2357 2357 id_, params = _build_data(self.apikey_regular, 'create_gist',
2358 2358 lifetime=10,
2359 2359 description='foobar-gist',
2360 2360 gist_type='public',
2361 2361 files={'foobar': {'content': 'foo'}})
2362 2362 response = api_call(self, params)
2363 2363 expected = {
2364 2364 'gist': {
2365 2365 'access_id': response.json['result']['gist']['access_id'],
2366 2366 'created_on': response.json['result']['gist']['created_on'],
2367 2367 'description': 'foobar-gist',
2368 2368 'expires': response.json['result']['gist']['expires'],
2369 2369 'gist_id': response.json['result']['gist']['gist_id'],
2370 2370 'type': 'public',
2371 2371 'url': response.json['result']['gist']['url']
2372 2372 },
2373 2373 'msg': 'created new gist'
2374 2374 }
2375 2375 self._compare_ok(id_, expected, given=response.body)
2376 2376
2377 2377 @mock.patch.object(GistModel, 'create', raise_exception)
2378 2378 def test_api_create_gist_exception_occurred(self):
2379 2379 id_, params = _build_data(self.apikey_regular, 'create_gist',
2380 2380 files={})
2381 2381 response = api_call(self, params)
2382 2382 expected = 'failed to create gist'
2383 2383 self._compare_error(id_, expected, given=response.body)
2384 2384
2385 2385 def test_api_delete_gist(self):
2386 2386 gist_id = fixture.create_gist().gist_access_id
2387 2387 id_, params = _build_data(self.apikey, 'delete_gist',
2388 2388 gistid=gist_id)
2389 2389 response = api_call(self, params)
2390 2390 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2391 2391 self._compare_ok(id_, expected, given=response.body)
2392 2392
2393 2393 def test_api_delete_gist_regular_user(self):
2394 2394 gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
2395 2395 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2396 2396 gistid=gist_id)
2397 2397 response = api_call(self, params)
2398 2398 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2399 2399 self._compare_ok(id_, expected, given=response.body)
2400 2400
2401 2401 def test_api_delete_gist_regular_user_no_permission(self):
2402 2402 gist_id = fixture.create_gist().gist_access_id
2403 2403 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2404 2404 gistid=gist_id)
2405 2405 response = api_call(self, params)
2406 2406 expected = 'gist `%s` does not exist' % (gist_id,)
2407 2407 self._compare_error(id_, expected, given=response.body)
2408 2408
2409 2409 @mock.patch.object(GistModel, 'delete', raise_exception)
2410 2410 def test_api_delete_gist_exception_occurred(self):
2411 2411 gist_id = fixture.create_gist().gist_access_id
2412 2412 id_, params = _build_data(self.apikey, 'delete_gist',
2413 2413 gistid=gist_id)
2414 2414 response = api_call(self, params)
2415 2415 expected = 'failed to delete gist ID:%s' % (gist_id,)
2416 2416 self._compare_error(id_, expected, given=response.body)
2417 2417
2418 2418 def test_api_get_ip(self):
2419 2419 id_, params = _build_data(self.apikey, 'get_ip')
2420 2420 response = api_call(self, params)
2421 2421 expected = {
2422 2422 'server_ip_addr': '0.0.0.0',
2423 2423 'user_ips': []
2424 2424 }
2425 2425 self._compare_ok(id_, expected, given=response.body)
2426 2426
2427 2427 def test_api_get_server_info(self):
2428 2428 id_, params = _build_data(self.apikey, 'get_server_info')
2429 2429 response = api_call(self, params)
2430 2430 expected = db.Setting.get_server_info()
2431 2431 self._compare_ok(id_, expected, given=response.body)
2432 2432
2433 2433 def test_api_get_changesets(self):
2434 2434 id_, params = _build_data(self.apikey, 'get_changesets',
2435 2435 repoid=self.REPO, start=0, end=2)
2436 2436 response = api_call(self, params)
2437 2437 result = ext_json.loads(response.body)["result"]
2438 2438 assert len(result) == 3
2439 2439 assert 'message' in result[0]
2440 2440 assert 'added' not in result[0]
2441 2441
2442 2442 def test_api_get_changesets_with_max_revisions(self):
2443 2443 id_, params = _build_data(self.apikey, 'get_changesets',
2444 2444 repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10)
2445 2445 response = api_call(self, params)
2446 2446 result = ext_json.loads(response.body)["result"]
2447 2447 assert len(result) == 10
2448 2448 assert 'message' in result[0]
2449 2449 assert 'added' not in result[0]
2450 2450
2451 2451 def test_api_get_changesets_with_branch(self):
2452 2452 if self.REPO == 'vcs_test_hg':
2453 2453 branch = 'stable'
2454 2454 else:
2455 2455 pytest.skip("skipping due to missing branches in git test repo")
2456 2456 id_, params = _build_data(self.apikey, 'get_changesets',
2457 2457 repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00")
2458 2458 response = api_call(self, params)
2459 2459 result = ext_json.loads(response.body)["result"]
2460 2460 assert len(result) == 5
2461 2461 assert 'message' in result[0]
2462 2462 assert 'added' not in result[0]
2463 2463
2464 2464 def test_api_get_changesets_with_file_list(self):
2465 2465 id_, params = _build_data(self.apikey, 'get_changesets',
2466 2466 repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True)
2467 2467 response = api_call(self, params)
2468 2468 result = ext_json.loads(response.body)["result"]
2469 2469 assert len(result) == 3
2470 2470 assert 'message' in result[0]
2471 2471 assert 'added' in result[0]
2472 2472
2473 2473 def test_api_get_changeset(self):
2474 2474 review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
2475 2475 id_, params = _build_data(self.apikey, 'get_changeset',
2476 2476 repoid=self.REPO, raw_id=self.TEST_REVISION)
2477 2477 response = api_call(self, params)
2478 2478 result = ext_json.loads(response.body)["result"]
2479 2479 assert result["raw_id"] == self.TEST_REVISION
2480 2480 assert "reviews" not in result
2481 2481
2482 2482 def test_api_get_changeset_with_reviews(self):
2483 2483 reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
2484 2484 id_, params = _build_data(self.apikey, 'get_changeset',
2485 2485 repoid=self.REPO, raw_id=self.TEST_REVISION,
2486 2486 with_reviews=True)
2487 2487 response = api_call(self, params)
2488 2488 result = ext_json.loads(response.body)["result"]
2489 2489 assert result["raw_id"] == self.TEST_REVISION
2490 2490 assert "reviews" in result
2491 2491 assert len(result["reviews"]) == 1
2492 2492 review = result["reviews"][0]
2493 2493 expected = {
2494 2494 'status': 'approved',
2495 2495 'modified_at': reviewobjs[0].modified_at.replace(microsecond=0).isoformat(),
2496 2496 'reviewer': 'test_admin',
2497 2497 }
2498 2498 assert review == expected
2499 2499
2500 2500 def test_api_get_changeset_that_does_not_exist(self):
2501 2501 """ Fetch changeset status for non-existant changeset.
2502 2502 revision id is the above git hash used in the test above with the
2503 2503 last 3 nibbles replaced with 0xf. Should not exist for git _or_ hg.
2504 2504 """
2505 2505 id_, params = _build_data(self.apikey, 'get_changeset',
2506 2506 repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff')
2507 2507 response = api_call(self, params)
2508 2508 expected = 'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',)
2509 2509 self._compare_error(id_, expected, given=response.body)
2510 2510
2511 2511 def test_api_get_changeset_without_permission(self):
2512 2512 review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
2513 2513 RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN)
2514 2514 RepoModel().revoke_user_permission(repo=self.REPO, user="default")
2515 2515 id_, params = _build_data(self.apikey_regular, 'get_changeset',
2516 2516 repoid=self.REPO, raw_id=self.TEST_REVISION)
2517 2517 response = api_call(self, params)
2518 2518 expected = 'Access denied to repo %s' % self.REPO
2519 2519 self._compare_error(id_, expected, given=response.body)
2520 2520
2521 2521 def test_api_get_pullrequest(self):
2522 2522 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test')
2523 2523 random_id = random.randrange(1, 9999)
2524 2524 params = ascii_bytes(ext_json.dumps({
2525 2525 "id": random_id,
2526 2526 "api_key": self.apikey,
2527 2527 "method": 'get_pullrequest',
2528 2528 "args": {"pullrequest_id": pull_request_id},
2529 2529 }))
2530 2530 response = api_call(self, params)
2531 2531 pullrequest = db.PullRequest().get(pull_request_id)
2532 2532 expected = {
2533 2533 "status": "new",
2534 2534 "pull_request_id": pull_request_id,
2535 2535 "description": "No description",
2536 2536 "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
2537 2537 "reviewers": [{"username": "test_regular"}],
2538 2538 "org_repo_url": "http://localhost:80/%s" % self.REPO,
2539 2539 "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
2540 2540 "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
2541 2541 "comments": [{"username": base.TEST_USER_ADMIN_LOGIN, "text": "",
2542 2542 "comment_id": pullrequest.comments[0].comment_id}],
2543 2543 "owner": base.TEST_USER_ADMIN_LOGIN,
2544 2544 "statuses": [{"status": "under_review", "reviewer": base.TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
2545 2545 "title": "get test",
2546 2546 "revisions": self.TEST_PR_REVISIONS,
2547 2547 "created_on": "2000-01-01T00:00:00",
2548 2548 "updated_on": "2000-01-01T00:00:00",
2549 2549 }
2550 2550 self._compare_ok(random_id, expected,
2551 2551 given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
2552 2552 b"2000-01-01T00:00:00", response.body))
2553 2553
2554 2554 def test_api_close_pullrequest(self):
2555 2555 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test')
2556 2556 random_id = random.randrange(1, 9999)
2557 2557 params = ascii_bytes(ext_json.dumps({
2558 2558 "id": random_id,
2559 2559 "api_key": self.apikey,
2560 2560 "method": "comment_pullrequest",
2561 2561 "args": {"pull_request_id": pull_request_id, "close_pr": True},
2562 2562 }))
2563 2563 response = api_call(self, params)
2564 2564 self._compare_ok(random_id, True, given=response.body)
2565 2565 pullrequest = db.PullRequest().get(pull_request_id)
2566 2566 assert pullrequest.comments[-1].text == ''
2567 2567 assert pullrequest.status == db.PullRequest.STATUS_CLOSED
2568 2568 assert pullrequest.is_closed() == True
2569 2569
2570 2570 def test_api_status_pullrequest(self):
2571 2571 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test")
2572 2572
2573 2573 random_id = random.randrange(1, 9999)
2574 2574 params = ascii_bytes(ext_json.dumps({
2575 2575 "id": random_id,
2576 2576 "api_key": db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key,
2577 2577 "method": "comment_pullrequest",
2578 2578 "args": {"pull_request_id": pull_request_id, "status": db.ChangesetStatus.STATUS_APPROVED},
2579 2579 }))
2580 2580 response = api_call(self, params)
2581 2581 pullrequest = db.PullRequest().get(pull_request_id)
2582 2582 self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
2583 2583 assert db.ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
2584 2584 params = ascii_bytes(ext_json.dumps({
2585 2585 "id": random_id,
2586 2586 "api_key": db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key,
2587 2587 "method": "comment_pullrequest",
2588 2588 "args": {"pull_request_id": pull_request_id, "status": db.ChangesetStatus.STATUS_APPROVED},
2589 2589 }))
2590 2590 response = api_call(self, params)
2591 2591 self._compare_ok(random_id, True, given=response.body)
2592 2592 pullrequest = db.PullRequest().get(pull_request_id)
2593 2593 assert db.ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
2594 2594
2595 2595 def test_api_comment_pullrequest(self):
2596 2596 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test")
2597 2597 random_id = random.randrange(1, 9999)
2598 2598 params = ascii_bytes(ext_json.dumps({
2599 2599 "id": random_id,
2600 2600 "api_key": self.apikey,
2601 2601 "method": "comment_pullrequest",
2602 2602 "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
2603 2603 }))
2604 2604 response = api_call(self, params)
2605 2605 self._compare_ok(random_id, True, given=response.body)
2606 2606 pullrequest = db.PullRequest().get(pull_request_id)
2607 2607 assert pullrequest.comments[-1].text == 'Looks good to me'
2608 2608
2609 2609 def test_api_edit_reviewers_add_single(self):
2610 2610 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2611 2611 pullrequest = db.PullRequest().get(pull_request_id)
2612 2612 pullrequest.owner = self.test_user
2613 2613 random_id = random.randrange(1, 9999)
2614 2614 params = ascii_bytes(ext_json.dumps({
2615 2615 "id": random_id,
2616 2616 "api_key": self.apikey_regular,
2617 2617 "method": "edit_reviewers",
2618 2618 "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
2619 2619 }))
2620 2620 response = api_call(self, params)
2621 2621 expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN], 'already_present': [], 'removed': [] }
2622 2622
2623 2623 self._compare_ok(random_id, expected, given=response.body)
2624 2624 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2625 2625
2626 2626 def test_api_edit_reviewers_add_nonexistent(self):
2627 2627 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2628 2628 pullrequest = db.PullRequest().get(pull_request_id)
2629 2629 pullrequest.owner = self.test_user
2630 2630 random_id = random.randrange(1, 9999)
2631 2631 params = ascii_bytes(ext_json.dumps({
2632 2632 "id": random_id,
2633 2633 "api_key": self.apikey_regular,
2634 2634 "method": "edit_reviewers",
2635 2635 "args": {"pull_request_id": pull_request_id, "add": 999},
2636 2636 }))
2637 2637 response = api_call(self, params)
2638 2638
2639 2639 self._compare_error(random_id, "user `999` does not exist", given=response.body)
2640 2640
2641 2641 def test_api_edit_reviewers_add_multiple(self):
2642 2642 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2643 2643 pullrequest = db.PullRequest().get(pull_request_id)
2644 2644 pullrequest.owner = self.test_user
2645 2645 random_id = random.randrange(1, 9999)
2646 2646 params = ascii_bytes(ext_json.dumps({
2647 2647 "id": random_id,
2648 2648 "api_key": self.apikey_regular,
2649 2649 "method": "edit_reviewers",
2650 2650 "args": {
2651 2651 "pull_request_id": pull_request_id,
2652 2652 "add": [ self.TEST_USER_LOGIN, base.TEST_USER_REGULAR2_LOGIN ]
2653 2653 },
2654 2654 }))
2655 2655 response = api_call(self, params)
2656 2656 # list order depends on python sorting hash, which is randomized
2657 2657 assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN, self.TEST_USER_LOGIN])
2658 2658 assert set(ext_json.loads(response.body)['result']['already_present']) == set()
2659 2659 assert set(ext_json.loads(response.body)['result']['removed']) == set()
2660 2660
2661 2661 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2662 2662 assert db.User.get_by_username(self.TEST_USER_LOGIN) in pullrequest.get_reviewer_users()
2663 2663
2664 2664 def test_api_edit_reviewers_add_already_present(self):
2665 2665 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2666 2666 pullrequest = db.PullRequest().get(pull_request_id)
2667 2667 pullrequest.owner = self.test_user
2668 2668 random_id = random.randrange(1, 9999)
2669 2669 params = ascii_bytes(ext_json.dumps({
2670 2670 "id": random_id,
2671 2671 "api_key": self.apikey_regular,
2672 2672 "method": "edit_reviewers",
2673 2673 "args": {
2674 2674 "pull_request_id": pull_request_id,
2675 2675 "add": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ]
2676 2676 },
2677 2677 }))
2678 2678 response = api_call(self, params)
2679 2679 expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN],
2680 2680 'already_present': [base.TEST_USER_REGULAR_LOGIN],
2681 2681 'removed': [],
2682 2682 }
2683 2683
2684 2684 self._compare_ok(random_id, expected, given=response.body)
2685 2685 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2686 2686 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2687 2687
2688 2688 def test_api_edit_reviewers_add_closed(self):
2689 2689 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2690 2690 pullrequest = db.PullRequest().get(pull_request_id)
2691 2691 pullrequest.owner = self.test_user
2692 2692 PullRequestModel().close_pull_request(pull_request_id)
2693 2693 random_id = random.randrange(1, 9999)
2694 2694 params = ascii_bytes(ext_json.dumps({
2695 2695 "id": random_id,
2696 2696 "api_key": self.apikey_regular,
2697 2697 "method": "edit_reviewers",
2698 2698 "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
2699 2699 }))
2700 2700 response = api_call(self, params)
2701 2701 self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body)
2702 2702
2703 2703 def test_api_edit_reviewers_add_not_owner(self):
2704 2704 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2705 2705 pullrequest = db.PullRequest().get(pull_request_id)
2706 2706 pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
2707 2707 random_id = random.randrange(1, 9999)
2708 2708 params = ascii_bytes(ext_json.dumps({
2709 2709 "id": random_id,
2710 2710 "api_key": self.apikey_regular,
2711 2711 "method": "edit_reviewers",
2712 2712 "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
2713 2713 }))
2714 2714 response = api_call(self, params)
2715 2715 self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body)
2716 2716
2717 2717
2718 2718 def test_api_edit_reviewers_remove_single(self):
2719 2719 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2720 2720 pullrequest = db.PullRequest().get(pull_request_id)
2721 2721 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2722 2722
2723 2723 pullrequest.owner = self.test_user
2724 2724 random_id = random.randrange(1, 9999)
2725 2725 params = ascii_bytes(ext_json.dumps({
2726 2726 "id": random_id,
2727 2727 "api_key": self.apikey_regular,
2728 2728 "method": "edit_reviewers",
2729 2729 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
2730 2730 }))
2731 2731 response = api_call(self, params)
2732 2732
2733 2733 expected = { 'added': [],
2734 2734 'already_present': [],
2735 2735 'removed': [base.TEST_USER_REGULAR_LOGIN],
2736 2736 }
2737 2737 self._compare_ok(random_id, expected, given=response.body)
2738 2738 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2739 2739
2740 2740 def test_api_edit_reviewers_remove_nonexistent(self):
2741 2741 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2742 2742 pullrequest = db.PullRequest().get(pull_request_id)
2743 2743 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2744 2744
2745 2745 pullrequest.owner = self.test_user
2746 2746 random_id = random.randrange(1, 9999)
2747 2747 params = ascii_bytes(ext_json.dumps({
2748 2748 "id": random_id,
2749 2749 "api_key": self.apikey_regular,
2750 2750 "method": "edit_reviewers",
2751 2751 "args": {"pull_request_id": pull_request_id, "remove": 999},
2752 2752 }))
2753 2753 response = api_call(self, params)
2754 2754
2755 2755 self._compare_error(random_id, "user `999` does not exist", given=response.body)
2756 2756 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2757 2757
2758 2758 def test_api_edit_reviewers_remove_nonpresent(self):
2759 2759 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2760 2760 pullrequest = db.PullRequest().get(pull_request_id)
2761 2761 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2762 2762 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2763 2763
2764 2764 pullrequest.owner = self.test_user
2765 2765 random_id = random.randrange(1, 9999)
2766 2766 params = ascii_bytes(ext_json.dumps({
2767 2767 "id": random_id,
2768 2768 "api_key": self.apikey_regular,
2769 2769 "method": "edit_reviewers",
2770 2770 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR2_LOGIN},
2771 2771 }))
2772 2772 response = api_call(self, params)
2773 2773
2774 2774 # NOTE: no explicit indication that removed user was not even a reviewer
2775 2775 expected = { 'added': [],
2776 2776 'already_present': [],
2777 2777 'removed': [base.TEST_USER_REGULAR2_LOGIN],
2778 2778 }
2779 2779 self._compare_ok(random_id, expected, given=response.body)
2780 2780 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2781 2781 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2782 2782
2783 2783 def test_api_edit_reviewers_remove_multiple(self):
2784 2784 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2785 2785 pullrequest = db.PullRequest().get(pull_request_id)
2786 2786 prr = db.PullRequestReviewer(db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN), pullrequest)
2787 2787 meta.Session().add(prr)
2788 2788 meta.Session().commit()
2789 2789
2790 2790 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2791 2791 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2792 2792
2793 2793 pullrequest.owner = self.test_user
2794 2794 random_id = random.randrange(1, 9999)
2795 2795 params = ascii_bytes(ext_json.dumps({
2796 2796 "id": random_id,
2797 2797 "api_key": self.apikey_regular,
2798 2798 "method": "edit_reviewers",
2799 2799 "args": {"pull_request_id": pull_request_id, "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ] },
2800 2800 }))
2801 2801 response = api_call(self, params)
2802 2802
2803 2803 # list order depends on python sorting hash, which is randomized
2804 2804 assert set(ext_json.loads(response.body)['result']['added']) == set()
2805 2805 assert set(ext_json.loads(response.body)['result']['already_present']) == set()
2806 2806 assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN])
2807 2807 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2808 2808 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2809 2809
2810 2810 def test_api_edit_reviewers_remove_closed(self):
2811 2811 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2812 2812 pullrequest = db.PullRequest().get(pull_request_id)
2813 2813 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2814 2814 PullRequestModel().close_pull_request(pull_request_id)
2815 2815
2816 2816 pullrequest.owner = self.test_user
2817 2817 random_id = random.randrange(1, 9999)
2818 2818 params = ascii_bytes(ext_json.dumps({
2819 2819 "id": random_id,
2820 2820 "api_key": self.apikey_regular,
2821 2821 "method": "edit_reviewers",
2822 2822 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
2823 2823 }))
2824 2824 response = api_call(self, params)
2825 2825
2826 2826 self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body)
2827 2827 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2828 2828
2829 2829 def test_api_edit_reviewers_remove_not_owner(self):
2830 2830 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2831 2831 pullrequest = db.PullRequest().get(pull_request_id)
2832 2832 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2833 2833
2834 2834 pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
2835 2835 random_id = random.randrange(1, 9999)
2836 2836 params = ascii_bytes(ext_json.dumps({
2837 2837 "id": random_id,
2838 2838 "api_key": self.apikey_regular,
2839 2839 "method": "edit_reviewers",
2840 2840 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
2841 2841 }))
2842 2842 response = api_call(self, params)
2843 2843
2844 2844 self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body)
2845 2845 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2846 2846
2847 2847 def test_api_edit_reviewers_add_remove_single(self):
2848 2848 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2849 2849 pullrequest = db.PullRequest().get(pull_request_id)
2850 2850 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2851 2851 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2852 2852
2853 2853 pullrequest.owner = self.test_user
2854 2854 random_id = random.randrange(1, 9999)
2855 2855 params = ascii_bytes(ext_json.dumps({
2856 2856 "id": random_id,
2857 2857 "api_key": self.apikey_regular,
2858 2858 "method": "edit_reviewers",
2859 2859 "args": {"pull_request_id": pull_request_id,
2860 2860 "add": base.TEST_USER_REGULAR2_LOGIN,
2861 2861 "remove": base.TEST_USER_REGULAR_LOGIN
2862 2862 },
2863 2863 }))
2864 2864 response = api_call(self, params)
2865 2865
2866 2866 expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN],
2867 2867 'already_present': [],
2868 2868 'removed': [base.TEST_USER_REGULAR_LOGIN],
2869 2869 }
2870 2870 self._compare_ok(random_id, expected, given=response.body)
2871 2871 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2872 2872 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2873 2873
2874 2874 def test_api_edit_reviewers_add_remove_multiple(self):
2875 2875 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2876 2876 pullrequest = db.PullRequest().get(pull_request_id)
2877 2877 prr = db.PullRequestReviewer(db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN), pullrequest)
2878 2878 meta.Session().add(prr)
2879 2879 meta.Session().commit()
2880 2880 assert db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) in pullrequest.get_reviewer_users()
2881 2881 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2882 2882 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2883 2883
2884 2884 pullrequest.owner = self.test_user
2885 2885 random_id = random.randrange(1, 9999)
2886 2886 params = ascii_bytes(ext_json.dumps({
2887 2887 "id": random_id,
2888 2888 "api_key": self.apikey_regular,
2889 2889 "method": "edit_reviewers",
2890 2890 "args": {"pull_request_id": pull_request_id,
2891 2891 "add": [ base.TEST_USER_REGULAR2_LOGIN ],
2892 2892 "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN ],
2893 2893 },
2894 2894 }))
2895 2895 response = api_call(self, params)
2896 2896
2897 2897 # list order depends on python sorting hash, which is randomized
2898 2898 assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN])
2899 2899 assert set(ext_json.loads(response.body)['result']['already_present']) == set()
2900 2900 assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN])
2901 2901 assert db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) not in pullrequest.get_reviewer_users()
2902 2902 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2903 2903 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2904 2904
2905 2905 def test_api_edit_reviewers_invalid_params(self):
2906 2906 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2907 2907 pullrequest = db.PullRequest().get(pull_request_id)
2908 2908 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2909 2909
2910 2910 pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
2911 2911 random_id = random.randrange(1, 9999)
2912 2912 params = ascii_bytes(ext_json.dumps({
2913 2913 "id": random_id,
2914 2914 "api_key": self.apikey_regular,
2915 2915 "method": "edit_reviewers",
2916 2916 "args": {"pull_request_id": pull_request_id},
2917 2917 }))
2918 2918 response = api_call(self, params)
2919 2919
2920 2920 self._compare_error(random_id, "Invalid request. Neither 'add' nor 'remove' is specified.", given=response.body)
2921 2921 assert ext_json.loads(response.body)['result'] is None
@@ -1,150 +1,151 b''
1 1 from kallithea.model import db, meta
2 2 from kallithea.model.repo_group import RepoGroupModel
3 3 from kallithea.tests import base
4 4 from kallithea.tests.fixture import Fixture
5 5
6 6
7 7 fixture = Fixture()
8 8
9 9
10 10 class TestRepoGroupsController(base.TestController):
11 11
12 12 def test_index(self):
13 13 self.log_user()
14 14 response = self.app.get(base.url('repos_groups'))
15 15 response.mustcontain('"records": []')
16 16
17 17 def test_new(self):
18 18 self.log_user()
19 19 response = self.app.get(base.url('new_repos_group'))
20 20
21 21 def test_create(self):
22 22 self.log_user()
23 23
24 24 group_name = 'foo'
25 25
26 26 # creation with form error
27 27 response = self.app.post(base.url('repos_groups'),
28 28 {'group_name': group_name,
29 29 '_session_csrf_secret_token': self.session_csrf_secret_token()})
30 30 response.mustcontain('name="group_name" type="text" value="%s"' % group_name)
31 31 response.mustcontain('<!-- for: group_description -->')
32 32
33 33 # creation
34 34 response = self.app.post(base.url('repos_groups'),
35 35 {'group_name': group_name,
36 36 'group_description': 'lala',
37 37 'parent_group_id': '-1',
38 38 'group_copy_permissions': 'True',
39 39 '_session_csrf_secret_token': self.session_csrf_secret_token()})
40 40 self.checkSessionFlash(response, 'Created repository group %s' % group_name)
41 41
42 42 # edit form
43 43 response = self.app.get(base.url('edit_repo_group', group_name=group_name))
44 44 response.mustcontain('>lala<')
45 45
46 46 # edit with form error
47 47 response = self.app.post(base.url('update_repos_group', group_name=group_name),
48 48 {'group_name': group_name,
49 49 '_session_csrf_secret_token': self.session_csrf_secret_token()})
50 50 response.mustcontain('name="group_name" type="text" value="%s"' % group_name)
51 51 response.mustcontain('<!-- for: group_description -->')
52 52
53 53 # edit
54 54 response = self.app.post(base.url('update_repos_group', group_name=group_name),
55 55 {'group_name': group_name,
56 'owner': base.TEST_USER_REGULAR2_LOGIN,
56 57 'group_description': 'lolo',
57 58 '_session_csrf_secret_token': self.session_csrf_secret_token()})
58 59 self.checkSessionFlash(response, 'Updated repository group %s' % group_name)
59 60 response = response.follow()
60 61 response.mustcontain('name="group_name" type="text" value="%s"' % group_name)
61 62 response.mustcontain(no='<!-- for: group_description -->')
62 63 response.mustcontain('>lolo<')
63 64
64 65 # listing
65 66 response = self.app.get(base.url('repos_groups'))
66 67 response.mustcontain('raw_name": "%s"' % group_name)
67 68
68 69 # show
69 70 response = self.app.get(base.url('repos_group', group_name=group_name))
70 71 response.mustcontain('href="/_admin/repo_groups/%s/edit"' % group_name)
71 72
72 73 # show ignores extra trailing slashes in the URL
73 74 response = self.app.get(base.url('repos_group', group_name='%s//' % group_name))
74 75 response.mustcontain('href="/_admin/repo_groups/%s/edit"' % group_name)
75 76
76 77 # delete
77 78 response = self.app.post(base.url('delete_repo_group', group_name=group_name),
78 79 {'_session_csrf_secret_token': self.session_csrf_secret_token()})
79 80 self.checkSessionFlash(response, 'Removed repository group %s' % group_name)
80 81
81 82 def test_new_by_regular_user(self):
82 83 self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
83 84 response = self.app.get(base.url('new_repos_group'), status=403)
84 85
85 86 def test_case_insensitivity(self):
86 87 self.log_user()
87 88 group_name = 'newgroup'
88 89 response = self.app.post(base.url('repos_groups'),
89 90 fixture._get_repo_group_create_params(group_name=group_name,
90 91 _session_csrf_secret_token=self.session_csrf_secret_token()))
91 92 # try to create repo group with swapped case
92 93 swapped_group_name = group_name.swapcase()
93 94 response = self.app.post(base.url('repos_groups'),
94 95 fixture._get_repo_group_create_params(group_name=swapped_group_name,
95 96 _session_csrf_secret_token=self.session_csrf_secret_token()))
96 97 response.mustcontain('already exists')
97 98
98 99 RepoGroupModel().delete(group_name)
99 100 meta.Session().commit()
100 101
101 102 def test_subgroup_deletion(self):
102 103 self.log_user()
103 104 parent = None
104 105 parent_name = 'parent'
105 106 sub = None
106 107 sub_name = 'sub'
107 108 sub_path = 'parent/sub'
108 109
109 110 try:
110 111 # create parent group
111 112 assert db.RepoGroup.guess_instance(parent_name) is None
112 113 response = self.app.post(
113 114 base.url('repos_groups'),
114 115 fixture._get_repo_group_create_params(
115 116 group_name=parent_name,
116 117 _session_csrf_secret_token=self.session_csrf_secret_token()
117 118 )
118 119 )
119 120 parent = db.RepoGroup.guess_instance(parent_name)
120 121 assert parent is not None
121 122
122 123 # create sub group
123 124 assert db.RepoGroup.guess_instance(sub_path) is None
124 125 response = self.app.post(
125 126 base.url('repos_groups'),
126 127 fixture._get_repo_group_create_params(
127 128 group_name=sub_name,
128 129 parent_group_id=parent.group_id,
129 130 _session_csrf_secret_token=self.session_csrf_secret_token()
130 131 )
131 132 )
132 133 sub = db.RepoGroup.guess_instance(sub_path)
133 134 assert sub is not None
134 135
135 136 # delete sub group
136 137 response = self.app.post(
137 138 base.url('delete_repo_group', group_name=sub_path),
138 139 params={
139 140 '_session_csrf_secret_token': self.session_csrf_secret_token()
140 141 },
141 142 )
142 143 sub = db.RepoGroup.guess_instance(sub_path)
143 144 assert sub is None
144 145
145 146 finally:
146 147 if sub:
147 148 RepoGroupModel().delete(sub)
148 149 if parent:
149 150 RepoGroupModel().delete(parent)
150 151 meta.Session().commit()
General Comments 0
You need to be logged in to leave comments. Login now