##// END OF EJS Templates
repo group: fix API for updating parent...
toras9000 -
r8733:fdc9c2fd stable
parent child Browse files
Show More
@@ -1,188 +1,189 b''
1 1 List of contributors to Kallithea project:
2 2
3 3 Mads Kiilerich <mads@kiilerich.com> 2016-2022
4 4 Manuel Jacob <me@manueljacob.de> 2019-2020 2022
5 toras9000 <toras9000@gmail.com> 2022
5 6 Thomas De Schampheleire <thomas.de_schampheleire@nokia.com> 2014-2021
6 7 ssantos <ssantos@web.de> 2018-2021
7 8 Private <adamantine.sword@gmail.com> 2019-2021
8 9 Γ‰tienne Gilli <etienne@gilli.io> 2020-2021
9 10 fresh <fresh190@protonmail.com> 2020-2021
10 11 robertus <robertuss12@gmail.com> 2020-2021
11 12 Eugenia Russell <eugenia.russell2019@gmail.com> 2021
12 13 Michalis <michalisntovas@yahoo.gr> 2021
13 14 vs <vsuhachev@yandex.ru> 2021
14 15 АлСксандр <akonn7@mail.ru> 2021
15 16 Asterios Dimitriou <steve@pci.gr> 2016-2017 2020
16 17 Allan NordhΓΈy <epost@anotheragency.no> 2017-2020
17 18 Anton Schur <tonich.sh@gmail.com> 2017 2020
18 19 Artem <kovalevartem.ru@gmail.com> 2020
19 20 David Ignjić <ignjic@gmail.com> 2020
20 21 Dennis Fink <dennis.fink@c3l.lu> 2020
21 22 J. Lavoie <j.lavoie@net-c.ca> 2020
22 23 Ross Thomas <ross@lns-nevasoft.com> 2020
23 24 Tim Ooms <tatankat@users.noreply.github.com> 2020
24 25 Andrej Shadura <andrew@shadura.me> 2012 2014-2017 2019
25 26 Γ‰tienne Gilli <etienne.gilli@gmail.com> 2015-2017 2019
26 27 Adi Kriegisch <adi@cg.tuwien.ac.at> 2019
27 28 Danni Randeris <danniranderis@gmail.com> 2019
28 29 Edmund Wong <ewong@crazy-cat.org> 2019
29 30 Elizabeth Sherrock <lizzyd710@gmail.com> 2019
30 31 Hüseyin Tunç <huseyin.tunc@bulutfon.com> 2019
31 32 leela <53352@protonmail.com> 2019
32 33 Mateusz Mendel <mendelm9@gmail.com> 2019
33 34 Nathan <bonnemainsnathan@gmail.com> 2019
34 35 Oleksandr Shtalinberg <o.shtalinberg@gmail.com> 2019
35 36 THANOS SIOURDAKIS <siourdakisthanos@gmail.com> 2019
36 37 Wolfgang Scherer <wolfgang.scherer@gmx.de> 2019
37 38 Π₯ристо Π‘Ρ‚Π°Π½Π΅Π² <hstanev@gmail.com> 2019
38 39 Dominik Ruf <dominikruf@gmail.com> 2012 2014-2018
39 40 Michal ČihaΕ™ <michal@cihar.com> 2014-2015 2018
40 41 Branko Majic <branko@majic.rs> 2015 2018
41 42 Chris Rule <crule@aegistg.com> 2018
42 43 JesΓΊs SΓ‘nchez <jsanchezfdz95@gmail.com> 2018
43 44 Patrick Vane <patrick_vane@lowentry.com> 2018
44 45 Pheng Heong Tan <phtan90@gmail.com> 2018
45 46 Максим Π―ΠΊΠΈΠΌΡ‡ΡƒΠΊ <xpinovo@gmail.com> 2018
46 47 ΠœΠ°Ρ€Ρ Π―ΠΌΠ±Π°Ρ€ <mjambarmeta@gmail.com> 2018
47 48 Mads Kiilerich <madski@unity3d.com> 2012-2017
48 49 Unity Technologies 2012-2017
49 50 SΓΈren LΓΈvborg <sorenl@unity3d.com> 2015-2017
50 51 Sam Jaques <sam.jaques@me.com> 2015 2017
51 52 Alessandro Molina <alessandro.molina@axant.it> 2017
52 53 Ching-Chen Mao <mao@lins.fju.edu.tw> 2017
53 54 Eivind Tagseth <eivindt@gmail.com> 2017
54 55 FUJIWARA Katsunori <foozy@lares.dti.ne.jp> 2017
55 56 Holger Schramm <info@schramm.by> 2017
56 57 Karl Goetz <karl@kgoetz.id.au> 2017
57 58 Lars Kruse <devel@sumpfralle.de> 2017
58 59 Marko Semet <markosemet@googlemail.com> 2017
59 60 Viktar Vauchkevich <victorenator@gmail.com> 2017
60 61 Takumi IINO <trot.thunder@gmail.com> 2012-2016
61 62 Jan Heylen <heyleke@gmail.com> 2015-2016
62 63 Robert Martinez <ntttq@inboxen.org> 2015-2016
63 64 Robert Rauch <mail@robertrauch.de> 2015-2016
64 65 Angel Ezquerra <angel.ezquerra@gmail.com> 2016
65 66 Anton Shestakov <av6@dwimlabs.net> 2016
66 67 Brandon Jones <bjones14@gmail.com> 2016
67 68 Kateryna Musina <kateryna@unity3d.com> 2016
68 69 Konstantin Veretennicov <kveretennicov@gmail.com> 2016
69 70 Oscar Curero <oscar@naiandei.net> 2016
70 71 Robert James Dennington <tinytimrob@googlemail.com> 2016
71 72 timeless@gmail.com 2016
72 73 YFdyh000 <yfdyh000@gmail.com> 2016
73 74 Aras Pranckevičius <aras@unity3d.com> 2012-2013 2015
74 75 Sean Farley <sean.michael.farley@gmail.com> 2013-2015
75 76 Bradley M. Kuhn <bkuhn@sfconservancy.org> 2014-2015
76 77 Christian Oyarzun <oyarzun@gmail.com> 2014-2015
77 78 Joseph Rivera <rivera.d.joseph@gmail.com> 2014-2015
78 79 Anatoly Bubenkov <bubenkoff@gmail.com> 2015
79 80 Andrew Bartlett <abartlet@catalyst.net.nz> 2015
80 81 BalÑzs Úr <urbalazs@gmail.com> 2015
81 82 Ben Finney <ben@benfinney.id.au> 2015
82 83 Daniel Hobley <danielh@unity3d.com> 2015
83 84 David Avigni <david.avigni@ankapi.com> 2015
84 85 Denis Blanchette <dblanchette@coveo.com> 2015
85 86 duanhongyi <duanhongyi@doopai.com> 2015
86 87 EriCSN Chang <ericsning@gmail.com> 2015
87 88 Grzegorz Krason <grzegorz.krason@gmail.com> 2015
88 89 JiΕ™Γ­ Suchan <yed@vanyli.net> 2015
89 90 Kazunari Kobayashi <kobanari@nifty.com> 2015
90 91 Kevin Bullock <kbullock@ringworld.org> 2015
91 92 kobanari <kobanari@nifty.com> 2015
92 93 Marc Abramowitz <marc@marc-abramowitz.com> 2015
93 94 Marc Villetard <marc.villetard@gmail.com> 2015
94 95 Matthias Zilk <matthias.zilk@gmail.com> 2015
95 96 Michael Pohl <michael@mipapo.de> 2015
96 97 Michael V. DePalatis <mike@depalatis.net> 2015
97 98 Morten Skaaning <mortens@unity3d.com> 2015
98 99 Nick High <nick@silverchip.org> 2015
99 100 Niemand Jedermann <predatorix@web.de> 2015
100 101 Peter Vitt <petervitt@web.de> 2015
101 102 Ronny Pfannschmidt <opensource@ronnypfannschmidt.de> 2015
102 103 Tuux <tuxa@galaxie.eu.org> 2015
103 104 Viktar Palstsiuk <vipals@gmail.com> 2015
104 105 Ante Ilic <ante@unity3d.com> 2014
105 106 Calinou <calinou@opmbx.org> 2014
106 107 Daniel Anderson <daniel@dattrix.com> 2014
107 108 Henrik Stuart <hg@hstuart.dk> 2014
108 109 Ingo von Borstel <kallithea@planetmaker.de> 2014
109 110 invision70 <invision70@gmail.com> 2014
110 111 Jelmer VernooΔ³ <jelmer@samba.org> 2014
111 112 Jim Hague <jim.hague@acm.org> 2014
112 113 Matt Fellows <kallithea@matt-fellows.me.uk> 2014
113 114 Max Roman <max@choloclos.se> 2014
114 115 Na'Tosha Bard <natosha@unity3d.com> 2014
115 116 Rasmus Selsmark <rasmuss@unity3d.com> 2014
116 117 SkryabinD <skryabind@gmail.com> 2014
117 118 Tim Freund <tim@freunds.net> 2014
118 119 Travis Burtrum <android@moparisthebest.com> 2014
119 120 whosaysni <whosaysni@gmail.com> 2014
120 121 Zoltan Gyarmati <mr.zoltan.gyarmati@gmail.com> 2014
121 122 Marcin KuΕΊmiΕ„ski <marcin@python-works.com> 2010-2013
122 123 Nemcio <areczek01@gmail.com> 2012-2013
123 124 xpol <xpolife@gmail.com> 2012-2013
124 125 Andrey Mivrenik <myvrenik@gmail.com> 2013
125 126 Aparkar <aparkar@icloud.com> 2013
126 127 ArcheR <aleclitvinov1980@gmail.com> 2013
127 128 Dennis Brakhane <brakhane@googlemail.com> 2013
128 129 gnustavo <gustavo@gnustavo.com> 2013
129 130 Grzegorz RoΕΌniecki <xaerxess@gmail.com> 2013
130 131 Ilya Beda <ir4y.ix@gmail.com> 2013
131 132 ivlevdenis <ivlevdenis.ru@gmail.com> 2013
132 133 Jonathan Sternberg <jonathansternberg@gmail.com> 2013
133 134 Leonardo Carneiro <leonardo@unity3d.com> 2013
134 135 Magnus Ericmats <magnus.ericmats@gmail.com> 2013
135 136 Martin Vium <martinv@unity3d.com> 2013
136 137 Mikhail Zholobov <legal90@gmail.com> 2013
137 138 mokeev1995 <mokeev_andre@mail.ru> 2013
138 139 Ruslan Bekenev <furyinbox@gmail.com> 2013
139 140 shirou - しろう 2013
140 141 Simon Lopez <simon.lopez@slopez.org> 2013
141 142 softforwinxp <softforwinxp@gmail.com> 2013
142 143 stephanj <info@stephan-jauernick.de> 2013
143 144 Ton Plomp <tcplomp@gmail.com> 2013
144 145 zhmylove <zhmylove@narod.ru> 2013
145 146 こいんとす <tkondou@gmail.com> 2013
146 147 Augusto Herrmann <augusto.herrmann@planejamento.gov.br> 2011-2012
147 148 Augusto Herrmann <augusto.herrmann@gmail.com> 2012
148 149 Dan Sheridan <djs@adelard.com> 2012
149 150 Dies Koper <diesk@fast.au.fujitsu.com> 2012
150 151 Erwin Kroon <e.kroon@smartmetersolutions.nl> 2012
151 152 H Waldo G <gwaldo@gmail.com> 2012
152 153 hppj <hppj@postmage.biz> 2012
153 154 Indra Talip <indra.talip@gmail.com> 2012
154 155 mikespook <mikespook@gmail.com> 2012
155 156 nansenat16 <nansenat16@null.tw> 2012
156 157 Nemcio <bogdan114@g.pl> 2012
157 158 Philip Jameson <philip.j@hostdime.com> 2012
158 159 Raoul Thill <raoul.thill@gmail.com> 2012
159 160 Stefan Engel <mail@engel-stefan.de> 2012
160 161 Tony Bussieres <t.bussieres@gmail.com> 2012
161 162 Vincent Caron <vcaron@bearstech.com> 2012
162 163 Vincent Duvert <vincent@duvert.net> 2012
163 164 Vladislav Poluhin <nuklea@gmail.com> 2012
164 165 Zachary Auclair <zach101@gmail.com> 2012
165 166 Ankit Solanki <ankit.solanki@gmail.com> 2011
166 167 Dmitri Kuznetsov 2011
167 168 Jared Bunting <jared.bunting@peachjean.com> 2011
168 169 Jason Harris <jason@jasonfharris.com> 2011
169 170 Les Peabody <lpeabody@gmail.com> 2011
170 171 Liad Shani <liadff@gmail.com> 2011
171 172 Lorenzo M. Catucci <lorenzo@sancho.ccd.uniroma2.it> 2011
172 173 Matt Zuba <matt.zuba@goodwillaz.org> 2011
173 174 Nicolas VINOT <aeris@imirhil.fr> 2011
174 175 Shawn K. O'Shea <shawn@eth0.net> 2011
175 176 Thayne Harbaugh <thayne@fusionio.com> 2011
176 177 Łukasz Balcerzak <lukaszbalcerzak@gmail.com> 2010
177 178 Andrew Kesterson <andrew@aklabs.net>
178 179 cejones
179 180 David A. SjΓΈen <david.sjoen@westcon.no>
180 181 James Rhodes <jrhodes@redpointsoftware.com.au>
181 182 Jonas Oberschweiber <jonas.oberschweiber@d-velop.de>
182 183 larikale
183 184 RhodeCode GmbH
184 185 Sebastian Kreutzberger <sebastian@rhodecode.com>
185 186 Steve Romanow <slestak989@gmail.com>
186 187 SteveCohen
187 188 Thomas <thomas@rhodecode.com>
188 189 Thomas Waldmann <tw-public@gmx.de>
@@ -1,2391 +1,2392 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.controllers.api.api
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 API controller for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Aug 20, 2011
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import logging
29 29 import traceback
30 30 from datetime import datetime
31 31
32 32 from tg import request
33 33
34 34 from kallithea.controllers.api import JSONRPCController, JSONRPCError
35 35 from kallithea.lib.auth import (AuthUser, HasPermissionAny, HasPermissionAnyDecorator, HasRepoGroupPermissionLevel, HasRepoPermissionLevel,
36 36 HasUserGroupPermissionLevel)
37 37 from kallithea.lib.exceptions import DefaultUserException, UserGroupsAssignedException
38 38 from kallithea.lib.utils import repo2db_mapper
39 39 from kallithea.lib.vcs.backends.base import EmptyChangeset
40 40 from kallithea.lib.vcs.exceptions import EmptyRepositoryError
41 41 from kallithea.model import db, meta, userlog
42 42 from kallithea.model.changeset_status import ChangesetStatusModel
43 43 from kallithea.model.comment import ChangesetCommentsModel
44 44 from kallithea.model.gist import GistModel
45 45 from kallithea.model.pull_request import PullRequestModel
46 46 from kallithea.model.repo import RepoModel
47 47 from kallithea.model.repo_group import RepoGroupModel
48 48 from kallithea.model.scm import ScmModel, UserGroupList
49 49 from kallithea.model.user import UserModel
50 50 from kallithea.model.user_group import UserGroupModel
51 51
52 52
53 53 log = logging.getLogger(__name__)
54 54
55 55
56 56 def store_update(updates, attr, name):
57 57 """
58 58 Stores param in updates dict if it's not None (i.e. if user explicitly set
59 59 a parameter). This allows easy updates of passed in params.
60 60 """
61 61 if attr is not None:
62 62 updates[name] = attr
63 63
64 64
65 65 def get_user_or_error(userid):
66 66 """
67 67 Get user by id or name or return JsonRPCError if not found
68 68
69 69 :param userid:
70 70 """
71 71 user = UserModel().get_user(userid)
72 72 if user is None:
73 73 raise JSONRPCError("user `%s` does not exist" % (userid,))
74 74 return user
75 75
76 76
77 77 def get_repo_or_error(repoid):
78 78 """
79 79 Get repo by id or name or return JsonRPCError if not found
80 80
81 81 :param repoid:
82 82 """
83 83 repo = RepoModel().get_repo(repoid)
84 84 if repo is None:
85 85 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
86 86 return repo
87 87
88 88
89 89 def get_repo_group_or_error(repogroupid):
90 90 """
91 91 Get repo group by id or name or return JsonRPCError if not found
92 92
93 93 :param repogroupid:
94 94 """
95 95 repo_group = db.RepoGroup.guess_instance(repogroupid)
96 96 if repo_group is None:
97 97 raise JSONRPCError(
98 98 'repository group `%s` does not exist' % (repogroupid,))
99 99 return repo_group
100 100
101 101
102 102 def get_user_group_or_error(usergroupid):
103 103 """
104 104 Get user group by id or name or return JsonRPCError if not found
105 105
106 106 :param usergroupid:
107 107 """
108 108 user_group = UserGroupModel().get_group(usergroupid)
109 109 if user_group is None:
110 110 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
111 111 return user_group
112 112
113 113
114 114 def get_perm_or_error(permid, prefix=None):
115 115 """
116 116 Get permission by id or name or return JsonRPCError if not found
117 117
118 118 :param permid:
119 119 """
120 120 perm = db.Permission.get_by_key(permid)
121 121 if perm is None:
122 122 raise JSONRPCError('permission `%s` does not exist' % (permid,))
123 123 if prefix:
124 124 if not perm.permission_name.startswith(prefix):
125 125 raise JSONRPCError('permission `%s` is invalid, '
126 126 'should start with %s' % (permid, prefix))
127 127 return perm
128 128
129 129
130 130 def get_gist_or_error(gistid):
131 131 """
132 132 Get gist by id or gist_access_id or return JsonRPCError if not found
133 133
134 134 :param gistid:
135 135 """
136 136 gist = GistModel().get_gist(gistid)
137 137 if gist is None:
138 138 raise JSONRPCError('gist `%s` does not exist' % (gistid,))
139 139 return gist
140 140
141 141
142 142 class ApiController(JSONRPCController):
143 143 """
144 144 API Controller
145 145
146 146 The authenticated user can be found as request.authuser.
147 147
148 148 Example function::
149 149
150 150 def func(arg1, arg2,...):
151 151 pass
152 152
153 153 Each function should also **raise** JSONRPCError for any
154 154 errors that happens.
155 155 """
156 156
157 157 @HasPermissionAnyDecorator('hg.admin')
158 158 def test(self, args):
159 159 return args
160 160
161 161 @HasPermissionAnyDecorator('hg.admin')
162 162 def pull(self, repoid, clone_uri=None):
163 163 """
164 164 Triggers a pull from remote location on given repo. Can be used to
165 165 automatically keep remote repos up to date. This command can be executed
166 166 only using api_key belonging to user with admin rights
167 167
168 168 :param repoid: repository name or repository id
169 169 :type repoid: str or int
170 170 :param clone_uri: repository URI to pull from (optional)
171 171 :type clone_uri: str
172 172
173 173 OUTPUT::
174 174
175 175 id : <id_given_in_input>
176 176 result : {
177 177 "msg" : "Pulled from `<repository name>`",
178 178 "repository" : "<repository name>"
179 179 }
180 180 error : null
181 181
182 182 ERROR OUTPUT::
183 183
184 184 id : <id_given_in_input>
185 185 result : null
186 186 error : {
187 187 "Unable to pull changes from `<reponame>`"
188 188 }
189 189 """
190 190 repo = get_repo_or_error(repoid)
191 191
192 192 try:
193 193 ScmModel().pull_changes(repo.repo_name,
194 194 request.authuser.username,
195 195 request.ip_addr,
196 196 clone_uri=clone_uri)
197 197 return dict(
198 198 msg='Pulled from `%s`' % repo.repo_name,
199 199 repository=repo.repo_name
200 200 )
201 201 except Exception:
202 202 log.error(traceback.format_exc())
203 203 raise JSONRPCError(
204 204 'Unable to pull changes from `%s`' % repo.repo_name
205 205 )
206 206
207 207 @HasPermissionAnyDecorator('hg.admin')
208 208 def rescan_repos(self, remove_obsolete=False):
209 209 """
210 210 Triggers rescan repositories action. If remove_obsolete is set
211 211 than also delete repos that are in database but not in the filesystem.
212 212 aka "clean zombies". This command can be executed only using api_key
213 213 belonging to user with admin rights.
214 214
215 215 :param remove_obsolete: deletes repositories from
216 216 database that are not found on the filesystem
217 217 :type remove_obsolete: Optional(bool)
218 218
219 219 OUTPUT::
220 220
221 221 id : <id_given_in_input>
222 222 result : {
223 223 'added': [<added repository name>,...]
224 224 'removed': [<removed repository name>,...]
225 225 }
226 226 error : null
227 227
228 228 ERROR OUTPUT::
229 229
230 230 id : <id_given_in_input>
231 231 result : null
232 232 error : {
233 233 'Error occurred during rescan repositories action'
234 234 }
235 235 """
236 236 try:
237 237 rm_obsolete = remove_obsolete
238 238 added, removed = repo2db_mapper(ScmModel().repo_scan(),
239 239 remove_obsolete=rm_obsolete)
240 240 return {'added': added, 'removed': removed}
241 241 except Exception:
242 242 log.error(traceback.format_exc())
243 243 raise JSONRPCError(
244 244 'Error occurred during rescan repositories action'
245 245 )
246 246
247 247 def invalidate_cache(self, repoid):
248 248 """
249 249 Invalidate cache for repository.
250 250 This command can be executed only using api_key belonging to user with admin
251 251 rights or regular user that have write or admin or write access to repository.
252 252
253 253 :param repoid: repository name or repository id
254 254 :type repoid: str or int
255 255
256 256 OUTPUT::
257 257
258 258 id : <id_given_in_input>
259 259 result : {
260 260 'msg': Cache for repository `<repository name>` was invalidated,
261 261 'repository': <repository name>
262 262 }
263 263 error : null
264 264
265 265 ERROR OUTPUT::
266 266
267 267 id : <id_given_in_input>
268 268 result : null
269 269 error : {
270 270 'Error occurred during cache invalidation action'
271 271 }
272 272 """
273 273 repo = get_repo_or_error(repoid)
274 274 if not HasPermissionAny('hg.admin')():
275 275 if not HasRepoPermissionLevel('write')(repo.repo_name):
276 276 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
277 277
278 278 try:
279 279 ScmModel().mark_for_invalidation(repo.repo_name)
280 280 return dict(
281 281 msg='Cache for repository `%s` was invalidated' % (repoid,),
282 282 repository=repo.repo_name
283 283 )
284 284 except Exception:
285 285 log.error(traceback.format_exc())
286 286 raise JSONRPCError(
287 287 'Error occurred during cache invalidation action'
288 288 )
289 289
290 290 @HasPermissionAnyDecorator('hg.admin')
291 291 def get_ip(self, userid=None):
292 292 """
293 293 Shows IP address as seen from Kallithea server, together with all
294 294 defined IP addresses for given user. If userid is not passed data is
295 295 returned for user who's calling this function.
296 296 This command can be executed only using api_key belonging to user with
297 297 admin rights.
298 298
299 299 :param userid: username to show ips for
300 300 :type userid: Optional(str or int)
301 301
302 302 OUTPUT::
303 303
304 304 id : <id_given_in_input>
305 305 result : {
306 306 "server_ip_addr" : "<ip_from_client>",
307 307 "user_ips" : [
308 308 {
309 309 "ip_addr" : "<ip_with_mask>",
310 310 "ip_range" : ["<start_ip>", "<end_ip>"]
311 311 },
312 312 ...
313 313 ]
314 314 }
315 315 error : null
316 316 """
317 317 if userid is None:
318 318 userid = request.authuser.user_id
319 319 user = get_user_or_error(userid)
320 320 ips = db.UserIpMap.query().filter(db.UserIpMap.user == user).all()
321 321 return dict(
322 322 server_ip_addr=request.ip_addr,
323 323 user_ips=ips
324 324 )
325 325
326 326 # alias for old
327 327 show_ip = get_ip
328 328
329 329 @HasPermissionAnyDecorator('hg.admin')
330 330 def get_server_info(self):
331 331 """
332 332 return server info, including Kallithea version and installed packages
333 333
334 334 OUTPUT::
335 335
336 336 id : <id_given_in_input>
337 337 result : {
338 338 'modules' : [ [<module name>, <module version>], ...]
339 339 'py_version' : <python version>,
340 340 'platform' : <platform type>,
341 341 'kallithea_version' : <kallithea version>,
342 342 'git_version' : '<git version>',
343 343 'git_path' : '<git path>'
344 344 }
345 345 error : null
346 346 """
347 347 return db.Setting.get_server_info()
348 348
349 349 def get_user(self, userid=None):
350 350 """
351 351 Gets a user by username or user_id, Returns empty result if user is
352 352 not found. If userid param is skipped it is set to id of user who is
353 353 calling this method. This command can be executed only using api_key
354 354 belonging to user with admin rights, or regular users that cannot
355 355 specify different userid than theirs
356 356
357 357 :param userid: user to get data for
358 358 :type userid: Optional(str or int)
359 359
360 360 OUTPUT::
361 361
362 362 id : <id_given_in_input>
363 363 result : None if user does not exist or
364 364 {
365 365 "user_id" : "<user_id>",
366 366 "username" : "<username>",
367 367 "firstname" : "<firstname>",
368 368 "lastname" : "<lastname>",
369 369 "email" : "<email>",
370 370 "emails" : "[<list of all emails including additional ones>]",
371 371 "active" : "<bool: user active>",
372 372 "admin" : "<bool: user is admin>",
373 373 "permissions" : {
374 374 "global" : ["hg.create.repository",
375 375 "repository.read",
376 376 "hg.register.manual_activate"],
377 377 "repositories" : {"repo1" : "repository.none"},
378 378 "repositories_groups" : {"Group1" : "group.read"},
379 379 "user_groups" : { "usrgrp1" : "usergroup.admin" }
380 380 }
381 381 }
382 382 error : null
383 383 """
384 384 if not HasPermissionAny('hg.admin')():
385 385 # make sure normal user does not pass someone else userid,
386 386 # he is not allowed to do that
387 387 if userid is not None and userid != request.authuser.user_id:
388 388 raise JSONRPCError(
389 389 'userid is not the same as your user'
390 390 )
391 391
392 392 if userid is None:
393 393 userid = request.authuser.user_id
394 394
395 395 user = get_user_or_error(userid)
396 396 data = user.get_api_data()
397 397 data['permissions'] = AuthUser(user_id=user.user_id).permissions
398 398 return data
399 399
400 400 @HasPermissionAnyDecorator('hg.admin')
401 401 def get_users(self):
402 402 """
403 403 Lists all existing users. This command can be executed only using api_key
404 404 belonging to user with admin rights.
405 405
406 406 OUTPUT::
407 407
408 408 id : <id_given_in_input>
409 409 result : [<user_object>, ...]
410 410 error : null
411 411 """
412 412 return [
413 413 user.get_api_data()
414 414 for user in db.User.query()
415 415 .order_by(db.User.username)
416 416 .filter_by(is_default_user=False)
417 417 ]
418 418
419 419 @HasPermissionAnyDecorator('hg.admin')
420 420 def create_user(self, username, email, password='',
421 421 firstname='', lastname='',
422 422 active=True, admin=False,
423 423 extern_type=db.User.DEFAULT_AUTH_TYPE,
424 424 extern_name=''):
425 425 """
426 426 Creates new user. Returns new user object. This command can
427 427 be executed only using api_key belonging to user with admin rights.
428 428
429 429 :param username: new username
430 430 :type username: str or int
431 431 :param email: email
432 432 :type email: str
433 433 :param password: password
434 434 :type password: Optional(str)
435 435 :param firstname: firstname
436 436 :type firstname: str
437 437 :param lastname: lastname
438 438 :type lastname: str
439 439 :param active: active
440 440 :type active: Optional(bool)
441 441 :param admin: admin
442 442 :type admin: Optional(bool)
443 443 :param extern_name: name of extern
444 444 :type extern_name: Optional(str)
445 445 :param extern_type: extern_type
446 446 :type extern_type: Optional(str)
447 447
448 448 OUTPUT::
449 449
450 450 id : <id_given_in_input>
451 451 result : {
452 452 "msg" : "created new user `<username>`",
453 453 "user" : <user_obj>
454 454 }
455 455 error : null
456 456
457 457 ERROR OUTPUT::
458 458
459 459 id : <id_given_in_input>
460 460 result : null
461 461 error : {
462 462 "user `<username>` already exist"
463 463 or
464 464 "email `<email>` already exist"
465 465 or
466 466 "failed to create user `<username>`"
467 467 }
468 468 """
469 469 if db.User.get_by_username(username):
470 470 raise JSONRPCError("user `%s` already exist" % (username,))
471 471
472 472 if db.User.get_by_email(email):
473 473 raise JSONRPCError("email `%s` already exist" % (email,))
474 474
475 475 try:
476 476 user = UserModel().create_or_update(
477 477 username=username,
478 478 password=password,
479 479 email=email,
480 480 firstname=firstname,
481 481 lastname=lastname,
482 482 active=active,
483 483 admin=admin,
484 484 extern_type=extern_type,
485 485 extern_name=extern_name
486 486 )
487 487 meta.Session().commit()
488 488 return dict(
489 489 msg='created new user `%s`' % username,
490 490 user=user.get_api_data()
491 491 )
492 492 except Exception:
493 493 log.error(traceback.format_exc())
494 494 raise JSONRPCError('failed to create user `%s`' % (username,))
495 495
496 496 @HasPermissionAnyDecorator('hg.admin')
497 497 def update_user(self, userid, username=None,
498 498 email=None, password=None,
499 499 firstname=None, lastname=None,
500 500 active=None, admin=None,
501 501 extern_type=None, extern_name=None):
502 502 """
503 503 updates given user if such user exists. This command can
504 504 be executed only using api_key belonging to user with admin rights.
505 505
506 506 :param userid: userid to update
507 507 :type userid: str or int
508 508 :param username: new username
509 509 :type username: Optional(str or int)
510 510 :param email: email
511 511 :type email: Optional(str)
512 512 :param password: password
513 513 :type password: Optional(str)
514 514 :param firstname: firstname
515 515 :type firstname: Optional(str)
516 516 :param lastname: lastname
517 517 :type lastname: Optional(str)
518 518 :param active: active
519 519 :type active: Optional(bool)
520 520 :param admin: admin
521 521 :type admin: Optional(bool)
522 522 :param extern_name:
523 523 :type extern_name: Optional(str)
524 524 :param extern_type:
525 525 :type extern_type: Optional(str)
526 526
527 527 OUTPUT::
528 528
529 529 id : <id_given_in_input>
530 530 result : {
531 531 "msg" : "updated user ID:<userid> <username>",
532 532 "user" : <user_object>
533 533 }
534 534 error : null
535 535
536 536 ERROR OUTPUT::
537 537
538 538 id : <id_given_in_input>
539 539 result : null
540 540 error : {
541 541 "failed to update user `<username>`"
542 542 }
543 543 """
544 544 user = get_user_or_error(userid)
545 545
546 546 # only non optional arguments will be stored in updates
547 547 updates = {}
548 548
549 549 try:
550 550
551 551 store_update(updates, username, 'username')
552 552 store_update(updates, password, 'password')
553 553 store_update(updates, email, 'email')
554 554 store_update(updates, firstname, 'name')
555 555 store_update(updates, lastname, 'lastname')
556 556 store_update(updates, active, 'active')
557 557 store_update(updates, admin, 'admin')
558 558 store_update(updates, extern_name, 'extern_name')
559 559 store_update(updates, extern_type, 'extern_type')
560 560
561 561 user = UserModel().update_user(user, **updates)
562 562 meta.Session().commit()
563 563 return dict(
564 564 msg='updated user ID:%s %s' % (user.user_id, user.username),
565 565 user=user.get_api_data()
566 566 )
567 567 except DefaultUserException:
568 568 log.error(traceback.format_exc())
569 569 raise JSONRPCError('editing default user is forbidden')
570 570 except Exception:
571 571 log.error(traceback.format_exc())
572 572 raise JSONRPCError('failed to update user `%s`' % (userid,))
573 573
574 574 @HasPermissionAnyDecorator('hg.admin')
575 575 def delete_user(self, userid):
576 576 """
577 577 deletes given user if such user exists. This command can
578 578 be executed only using api_key belonging to user with admin rights.
579 579
580 580 :param userid: user to delete
581 581 :type userid: str or int
582 582
583 583 OUTPUT::
584 584
585 585 id : <id_given_in_input>
586 586 result : {
587 587 "msg" : "deleted user ID:<userid> <username>",
588 588 "user" : null
589 589 }
590 590 error : null
591 591
592 592 ERROR OUTPUT::
593 593
594 594 id : <id_given_in_input>
595 595 result : null
596 596 error : {
597 597 "failed to delete user ID:<userid> <username>"
598 598 }
599 599 """
600 600 user = get_user_or_error(userid)
601 601
602 602 try:
603 603 UserModel().delete(userid)
604 604 meta.Session().commit()
605 605 return dict(
606 606 msg='deleted user ID:%s %s' % (user.user_id, user.username),
607 607 user=None
608 608 )
609 609 except Exception:
610 610
611 611 log.error(traceback.format_exc())
612 612 raise JSONRPCError('failed to delete user ID:%s %s'
613 613 % (user.user_id, user.username))
614 614
615 615 # permission check inside
616 616 def get_user_group(self, usergroupid):
617 617 """
618 618 Gets an existing user group. This command can be executed only using api_key
619 619 belonging to user with admin rights or user who has at least
620 620 read access to user group.
621 621
622 622 :param usergroupid: id of user_group to edit
623 623 :type usergroupid: str or int
624 624
625 625 OUTPUT::
626 626
627 627 id : <id_given_in_input>
628 628 result : None if group not exist
629 629 {
630 630 "users_group_id" : "<id>",
631 631 "group_name" : "<groupname>",
632 632 "group_description" : "<description>",
633 633 "active" : "<bool>",
634 634 "owner" : "<username>",
635 635 "members" : [<user_obj>,...]
636 636 }
637 637 error : null
638 638 """
639 639 user_group = get_user_group_or_error(usergroupid)
640 640 if not HasPermissionAny('hg.admin')():
641 641 if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
642 642 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
643 643
644 644 data = user_group.get_api_data()
645 645 return data
646 646
647 647 # permission check inside
648 648 def get_user_groups(self):
649 649 """
650 650 Lists all existing user groups. This command can be executed only using
651 651 api_key belonging to user with admin rights or user who has at least
652 652 read access to user group.
653 653
654 654 OUTPUT::
655 655
656 656 id : <id_given_in_input>
657 657 result : [<user_group_obj>,...]
658 658 error : null
659 659 """
660 660 return [
661 661 user_group.get_api_data()
662 662 for user_group in UserGroupList(db.UserGroup.query().all(), perm_level='read')
663 663 ]
664 664
665 665 @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
666 666 def create_user_group(self, group_name, description='',
667 667 owner=None, active=True):
668 668 """
669 669 Creates new user group. This command can be executed only using api_key
670 670 belonging to user with admin rights or an user who has create user group
671 671 permission
672 672
673 673 :param group_name: name of new user group
674 674 :type group_name: str
675 675 :param description: group description
676 676 :type description: Optional(str)
677 677 :param owner: owner of group. If not passed apiuser is the owner
678 678 :type owner: Optional(str or int)
679 679 :param active: group is active
680 680 :type active: Optional(bool)
681 681
682 682 OUTPUT::
683 683
684 684 id : <id_given_in_input>
685 685 result : {
686 686 "msg" : "created new user group `<groupname>`",
687 687 "user_group" : <user_group_object>
688 688 }
689 689 error : null
690 690
691 691 ERROR OUTPUT::
692 692
693 693 id : <id_given_in_input>
694 694 result : null
695 695 error : {
696 696 "user group `<group name>` already exist"
697 697 or
698 698 "failed to create group `<group name>`"
699 699 }
700 700 """
701 701 if UserGroupModel().get_by_name(group_name):
702 702 raise JSONRPCError("user group `%s` already exist" % (group_name,))
703 703
704 704 try:
705 705 if owner is None:
706 706 owner = request.authuser.user_id
707 707
708 708 owner = get_user_or_error(owner)
709 709 ug = UserGroupModel().create(name=group_name, description=description,
710 710 owner=owner, active=active)
711 711 meta.Session().commit()
712 712 return dict(
713 713 msg='created new user group `%s`' % group_name,
714 714 user_group=ug.get_api_data()
715 715 )
716 716 except Exception:
717 717 log.error(traceback.format_exc())
718 718 raise JSONRPCError('failed to create group `%s`' % (group_name,))
719 719
720 720 # permission check inside
721 721 def update_user_group(self, usergroupid, group_name=None,
722 722 description=None, owner=None,
723 723 active=None):
724 724 """
725 725 Updates given usergroup. This command can be executed only using api_key
726 726 belonging to user with admin rights or an admin of given user group
727 727
728 728 :param usergroupid: id of user group to update
729 729 :type usergroupid: str or int
730 730 :param group_name: name of new user group
731 731 :type group_name: str
732 732 :param description: group description
733 733 :type description: str
734 734 :param owner: owner of group.
735 735 :type owner: Optional(str or int)
736 736 :param active: group is active
737 737 :type active: Optional(bool)
738 738
739 739 OUTPUT::
740 740
741 741 id : <id_given_in_input>
742 742 result : {
743 743 "msg" : 'updated user group ID:<user group id> <user group name>',
744 744 "user_group" : <user_group_object>
745 745 }
746 746 error : null
747 747
748 748 ERROR OUTPUT::
749 749
750 750 id : <id_given_in_input>
751 751 result : null
752 752 error : {
753 753 "failed to update user group `<user group name>`"
754 754 }
755 755 """
756 756 user_group = get_user_group_or_error(usergroupid)
757 757 if not HasPermissionAny('hg.admin')():
758 758 if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
759 759 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
760 760
761 761 if owner is not None:
762 762 owner = get_user_or_error(owner)
763 763
764 764 updates = {}
765 765 store_update(updates, group_name, 'users_group_name')
766 766 store_update(updates, description, 'user_group_description')
767 767 store_update(updates, owner, 'owner')
768 768 store_update(updates, active, 'users_group_active')
769 769 try:
770 770 UserGroupModel().update(user_group, updates)
771 771 meta.Session().commit()
772 772 return dict(
773 773 msg='updated user group ID:%s %s' % (user_group.users_group_id,
774 774 user_group.users_group_name),
775 775 user_group=user_group.get_api_data()
776 776 )
777 777 except Exception:
778 778 log.error(traceback.format_exc())
779 779 raise JSONRPCError('failed to update user group `%s`' % (usergroupid,))
780 780
781 781 # permission check inside
782 782 def delete_user_group(self, usergroupid):
783 783 """
784 784 Delete given user group by user group id or name.
785 785 This command can be executed only using api_key
786 786 belonging to user with admin rights or an admin of given user group
787 787
788 788 :param usergroupid:
789 789 :type usergroupid: str or int
790 790
791 791 OUTPUT::
792 792
793 793 id : <id_given_in_input>
794 794 result : {
795 795 "msg" : "deleted user group ID:<user_group_id> <user_group_name>"
796 796 }
797 797 error : null
798 798
799 799 ERROR OUTPUT::
800 800
801 801 id : <id_given_in_input>
802 802 result : null
803 803 error : {
804 804 "failed to delete user group ID:<user_group_id> <user_group_name>"
805 805 or
806 806 "RepoGroup assigned to <repo_groups_list>"
807 807 }
808 808 """
809 809 user_group = get_user_group_or_error(usergroupid)
810 810 if not HasPermissionAny('hg.admin')():
811 811 if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
812 812 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
813 813
814 814 try:
815 815 UserGroupModel().delete(user_group)
816 816 meta.Session().commit()
817 817 return dict(
818 818 msg='deleted user group ID:%s %s' %
819 819 (user_group.users_group_id, user_group.users_group_name),
820 820 user_group=None
821 821 )
822 822 except UserGroupsAssignedException as e:
823 823 log.error(traceback.format_exc())
824 824 raise JSONRPCError(str(e))
825 825 except Exception:
826 826 log.error(traceback.format_exc())
827 827 raise JSONRPCError('failed to delete user group ID:%s %s' %
828 828 (user_group.users_group_id,
829 829 user_group.users_group_name)
830 830 )
831 831
832 832 # permission check inside
833 833 def add_user_to_user_group(self, usergroupid, userid):
834 834 """
835 835 Adds a user to a user group. If user exists in that group success will be
836 836 `false`. This command can be executed only using api_key
837 837 belonging to user with admin rights or an admin of a given user group
838 838
839 839 :param usergroupid:
840 840 :type usergroupid: str or int
841 841 :param userid:
842 842 :type userid: str or int
843 843
844 844 OUTPUT::
845 845
846 846 id : <id_given_in_input>
847 847 result : {
848 848 "success" : True|False # depends on if member is in group
849 849 "msg" : "added member `<username>` to a user group `<groupname>` |
850 850 User is already in that group"
851 851 }
852 852 error : null
853 853
854 854 ERROR OUTPUT::
855 855
856 856 id : <id_given_in_input>
857 857 result : null
858 858 error : {
859 859 "failed to add member to user group `<user_group_name>`"
860 860 }
861 861 """
862 862 user = get_user_or_error(userid)
863 863 user_group = get_user_group_or_error(usergroupid)
864 864 if not HasPermissionAny('hg.admin')():
865 865 if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
866 866 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
867 867
868 868 try:
869 869 ugm = UserGroupModel().add_user_to_group(user_group, user)
870 870 success = True if ugm is not True else False
871 871 msg = 'added member `%s` to user group `%s`' % (
872 872 user.username, user_group.users_group_name
873 873 )
874 874 msg = msg if success else 'User is already in that group'
875 875 meta.Session().commit()
876 876
877 877 return dict(
878 878 success=success,
879 879 msg=msg
880 880 )
881 881 except Exception:
882 882 log.error(traceback.format_exc())
883 883 raise JSONRPCError(
884 884 'failed to add member to user group `%s`' % (
885 885 user_group.users_group_name,
886 886 )
887 887 )
888 888
889 889 # permission check inside
890 890 def remove_user_from_user_group(self, usergroupid, userid):
891 891 """
892 892 Removes a user from a user group. If user is not in given group success will
893 893 be `false`. This command can be executed only
894 894 using api_key belonging to user with admin rights or an admin of given user group
895 895
896 896 :param usergroupid:
897 897 :param userid:
898 898
899 899 OUTPUT::
900 900
901 901 id : <id_given_in_input>
902 902 result : {
903 903 "success" : True|False, # depends on if member is in group
904 904 "msg" : "removed member <username> from user group <groupname> |
905 905 User wasn't in group"
906 906 }
907 907 error : null
908 908 """
909 909 user = get_user_or_error(userid)
910 910 user_group = get_user_group_or_error(usergroupid)
911 911 if not HasPermissionAny('hg.admin')():
912 912 if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
913 913 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
914 914
915 915 try:
916 916 success = UserGroupModel().remove_user_from_group(user_group, user)
917 917 msg = 'removed member `%s` from user group `%s`' % (
918 918 user.username, user_group.users_group_name
919 919 )
920 920 msg = msg if success else "User wasn't in group"
921 921 meta.Session().commit()
922 922 return dict(success=success, msg=msg)
923 923 except Exception:
924 924 log.error(traceback.format_exc())
925 925 raise JSONRPCError(
926 926 'failed to remove member from user group `%s`' % (
927 927 user_group.users_group_name,
928 928 )
929 929 )
930 930
931 931 # permission check inside
932 932 def get_repo(self, repoid,
933 933 with_revision_names=False,
934 934 with_pullrequests=False):
935 935 """
936 936 Gets an existing repository by it's name or repository_id. Members will return
937 937 either users_group or user associated to that repository. This command can be
938 938 executed only using api_key belonging to user with admin
939 939 rights or regular user that have at least read access to repository.
940 940
941 941 :param repoid: repository name or repository id
942 942 :type repoid: str or int
943 943
944 944 OUTPUT::
945 945
946 946 id : <id_given_in_input>
947 947 result : {
948 948 "repo_id" : "<repo_id>",
949 949 "repo_name" : "<reponame>",
950 950 "repo_type" : "<repo_type>",
951 951 "clone_uri" : "<clone_uri>",
952 952 "enable_downloads" : "<bool>",
953 953 "enable_statistics": "<bool>",
954 954 "private" : "<bool>",
955 955 "created_on" : "<date_time_created>",
956 956 "description" : "<description>",
957 957 "landing_rev" : "<landing_rev>",
958 958 "last_changeset" : {
959 959 "author" : "<full_author>",
960 960 "date" : "<date_time_of_commit>",
961 961 "message" : "<commit_message>",
962 962 "raw_id" : "<raw_id>",
963 963 "revision": "<numeric_revision>",
964 964 "short_id": "<short_id>"
965 965 },
966 966 "owner" : "<repo_owner>",
967 967 "fork_of" : "<name_of_fork_parent>",
968 968 "members" : [
969 969 {
970 970 "name" : "<username>",
971 971 "type" : "user",
972 972 "permission" : "repository.(read|write|admin)"
973 973 },
974 974 …
975 975 {
976 976 "name" : "<usergroup name>",
977 977 "type" : "user_group",
978 978 "permission" : "usergroup.(read|write|admin)"
979 979 },
980 980 …
981 981 ],
982 982 "followers" : [<user_obj>, ...],
983 983 <if with_revision_names == True>
984 984 "tags" : {
985 985 "<tagname>" : "<raw_id>",
986 986 ...
987 987 },
988 988 "branches" : {
989 989 "<branchname>" : "<raw_id>",
990 990 ...
991 991 },
992 992 "bookmarks" : {
993 993 "<bookmarkname>" : "<raw_id>",
994 994 ...
995 995 }
996 996 }
997 997 error : null
998 998 """
999 999 repo = get_repo_or_error(repoid)
1000 1000
1001 1001 if not HasPermissionAny('hg.admin')():
1002 1002 if not HasRepoPermissionLevel('read')(repo.repo_name):
1003 1003 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1004 1004
1005 1005 members = []
1006 1006 for user in repo.repo_to_perm:
1007 1007 perm = user.permission.permission_name
1008 1008 user = user.user
1009 1009 user_data = {
1010 1010 'name': user.username,
1011 1011 'type': "user",
1012 1012 'permission': perm
1013 1013 }
1014 1014 members.append(user_data)
1015 1015
1016 1016 for user_group in repo.users_group_to_perm:
1017 1017 perm = user_group.permission.permission_name
1018 1018 user_group = user_group.users_group
1019 1019 user_group_data = {
1020 1020 'name': user_group.users_group_name,
1021 1021 'type': "user_group",
1022 1022 'permission': perm
1023 1023 }
1024 1024 members.append(user_group_data)
1025 1025
1026 1026 followers = [
1027 1027 uf.user.get_api_data()
1028 1028 for uf in repo.followers
1029 1029 ]
1030 1030
1031 1031 data = repo.get_api_data(with_revision_names=with_revision_names,
1032 1032 with_pullrequests=with_pullrequests)
1033 1033 data['members'] = members
1034 1034 data['followers'] = followers
1035 1035 return data
1036 1036
1037 1037 # permission check inside
1038 1038 def get_repos(self):
1039 1039 """
1040 1040 Lists all existing repositories. This command can be executed only using
1041 1041 api_key belonging to user with admin rights or regular user that have
1042 1042 admin, write or read access to repository.
1043 1043
1044 1044 OUTPUT::
1045 1045
1046 1046 id : <id_given_in_input>
1047 1047 result : [
1048 1048 {
1049 1049 "repo_id" : "<repo_id>",
1050 1050 "repo_name" : "<reponame>",
1051 1051 "repo_type" : "<repo_type>",
1052 1052 "clone_uri" : "<clone_uri>",
1053 1053 "private" : "<bool>",
1054 1054 "created_on" : "<datetimecreated>",
1055 1055 "description" : "<description>",
1056 1056 "landing_rev" : "<landing_rev>",
1057 1057 "owner" : "<repo_owner>",
1058 1058 "fork_of" : "<name_of_fork_parent>",
1059 1059 "enable_downloads" : "<bool>",
1060 1060 "enable_statistics": "<bool>"
1061 1061 },
1062 1062 …
1063 1063 ]
1064 1064 error : null
1065 1065 """
1066 1066 if not HasPermissionAny('hg.admin')():
1067 1067 repos = request.authuser.get_all_user_repos()
1068 1068 else:
1069 1069 repos = db.Repository.query()
1070 1070
1071 1071 return [
1072 1072 repo.get_api_data()
1073 1073 for repo in repos
1074 1074 ]
1075 1075
1076 1076 # permission check inside
1077 1077 def get_repo_nodes(self, repoid, revision, root_path,
1078 1078 ret_type='all'):
1079 1079 """
1080 1080 returns a list of nodes and it's children in a flat list for a given path
1081 1081 at given revision. It's possible to specify ret_type to show only `files` or
1082 1082 `dirs`. This command can be executed only using api_key belonging to
1083 1083 user with admin rights or regular user that have at least read access to repository.
1084 1084
1085 1085 :param repoid: repository name or repository id
1086 1086 :type repoid: str or int
1087 1087 :param revision: revision for which listing should be done
1088 1088 :type revision: str
1089 1089 :param root_path: path from which start displaying
1090 1090 :type root_path: str
1091 1091 :param ret_type: return type 'all|files|dirs' nodes
1092 1092 :type ret_type: Optional(str)
1093 1093
1094 1094 OUTPUT::
1095 1095
1096 1096 id : <id_given_in_input>
1097 1097 result : [
1098 1098 {
1099 1099 "name" : "<name>",
1100 1100 "type" : "<type>"
1101 1101 },
1102 1102 …
1103 1103 ]
1104 1104 error : null
1105 1105 """
1106 1106 repo = get_repo_or_error(repoid)
1107 1107
1108 1108 if not HasPermissionAny('hg.admin')():
1109 1109 if not HasRepoPermissionLevel('read')(repo.repo_name):
1110 1110 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1111 1111
1112 1112 _map = {}
1113 1113 try:
1114 1114 _d, _f = ScmModel().get_nodes(repo, revision, root_path,
1115 1115 flat=False)
1116 1116 _map = {
1117 1117 'all': _d + _f,
1118 1118 'files': _f,
1119 1119 'dirs': _d,
1120 1120 }
1121 1121 return _map[ret_type]
1122 1122 except KeyError:
1123 1123 raise JSONRPCError('ret_type must be one of %s'
1124 1124 % (','.join(sorted(_map))))
1125 1125 except Exception:
1126 1126 log.error(traceback.format_exc())
1127 1127 raise JSONRPCError(
1128 1128 'failed to get repo: `%s` nodes' % repo.repo_name
1129 1129 )
1130 1130
1131 1131 # permission check inside
1132 1132 def create_repo(self, repo_name, owner=None,
1133 1133 repo_type=None, description='',
1134 1134 private=False, clone_uri=None,
1135 1135 landing_rev='rev:tip',
1136 1136 enable_statistics=None,
1137 1137 enable_downloads=None,
1138 1138 copy_permissions=False):
1139 1139 """
1140 1140 Creates a repository. The repository name contains the full path, but the
1141 1141 parent repository group must exist. For example "foo/bar/baz" require the groups
1142 1142 "foo" and "bar" (with "foo" as parent), and create "baz" repository with
1143 1143 "bar" as group. This command can be executed only using api_key
1144 1144 belonging to user with admin rights or regular user that have create
1145 1145 repository permission. Regular users cannot specify owner parameter
1146 1146
1147 1147 :param repo_name: repository name
1148 1148 :type repo_name: str
1149 1149 :param owner: user_id or username
1150 1150 :type owner: Optional(str)
1151 1151 :param repo_type: 'hg' or 'git'
1152 1152 :type repo_type: Optional(str)
1153 1153 :param description: repository description
1154 1154 :type description: Optional(str)
1155 1155 :param private:
1156 1156 :type private: bool
1157 1157 :param clone_uri:
1158 1158 :type clone_uri: str
1159 1159 :param landing_rev: <rev_type>:<rev>
1160 1160 :type landing_rev: str
1161 1161 :param enable_downloads:
1162 1162 :type enable_downloads: bool
1163 1163 :param enable_statistics:
1164 1164 :type enable_statistics: bool
1165 1165 :param copy_permissions: Copy permission from group that repository is
1166 1166 being created.
1167 1167 :type copy_permissions: bool
1168 1168
1169 1169 OUTPUT::
1170 1170
1171 1171 id : <id_given_in_input>
1172 1172 result : {
1173 1173 "msg" : "Created new repository `<reponame>`",
1174 1174 "success" : true
1175 1175 }
1176 1176 error : null
1177 1177
1178 1178 ERROR OUTPUT::
1179 1179
1180 1180 id : <id_given_in_input>
1181 1181 result : null
1182 1182 error : {
1183 1183 'failed to create repository `<repo_name>`
1184 1184 }
1185 1185 """
1186 1186 group_name = None
1187 1187 repo_name_parts = repo_name.split('/')
1188 1188 if len(repo_name_parts) > 1:
1189 1189 group_name = '/'.join(repo_name_parts[:-1])
1190 1190 repo_group = db.RepoGroup.get_by_group_name(group_name)
1191 1191 if repo_group is None:
1192 1192 raise JSONRPCError("repo group `%s` not found" % group_name)
1193 1193 if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(group_name)):
1194 1194 raise JSONRPCError("no permission to create repo in %s" % group_name)
1195 1195 else:
1196 1196 if not HasPermissionAny('hg.admin', 'hg.create.repository')():
1197 1197 raise JSONRPCError("no permission to create top level repo")
1198 1198
1199 1199 if not HasPermissionAny('hg.admin')():
1200 1200 if owner is not None:
1201 1201 # forbid setting owner for non-admins
1202 1202 raise JSONRPCError(
1203 1203 'Only Kallithea admin can specify `owner` param'
1204 1204 )
1205 1205 if owner is None:
1206 1206 owner = request.authuser.user_id
1207 1207
1208 1208 owner = get_user_or_error(owner)
1209 1209
1210 1210 if RepoModel().get_by_repo_name(repo_name):
1211 1211 raise JSONRPCError("repo `%s` already exist" % repo_name)
1212 1212
1213 1213 defs = db.Setting.get_default_repo_settings(strip_prefix=True)
1214 1214 if private is None:
1215 1215 private = defs.get('repo_private') or False
1216 1216 if repo_type is None:
1217 1217 repo_type = defs.get('repo_type')
1218 1218 if enable_statistics is None:
1219 1219 enable_statistics = defs.get('repo_enable_statistics')
1220 1220 if enable_downloads is None:
1221 1221 enable_downloads = defs.get('repo_enable_downloads')
1222 1222
1223 1223 try:
1224 1224 data = dict(
1225 1225 repo_name=repo_name_parts[-1],
1226 1226 repo_name_full=repo_name,
1227 1227 repo_type=repo_type,
1228 1228 repo_description=description,
1229 1229 repo_private=private,
1230 1230 clone_uri=clone_uri,
1231 1231 repo_group=group_name,
1232 1232 repo_landing_rev=landing_rev,
1233 1233 repo_enable_statistics=enable_statistics,
1234 1234 repo_enable_downloads=enable_downloads,
1235 1235 repo_copy_permissions=copy_permissions,
1236 1236 )
1237 1237
1238 1238 RepoModel().create(form_data=data, cur_user=owner.username)
1239 1239 # no commit, it's done in RepoModel, or async via celery
1240 1240 return dict(
1241 1241 msg="Created new repository `%s`" % (repo_name,),
1242 1242 success=True, # cannot return the repo data here since fork
1243 1243 # can be done async
1244 1244 )
1245 1245 except Exception:
1246 1246 log.error(traceback.format_exc())
1247 1247 raise JSONRPCError(
1248 1248 'failed to create repository `%s`' % (repo_name,))
1249 1249
1250 1250 # permission check inside
1251 1251 def update_repo(self, repoid, name=None,
1252 1252 owner=None,
1253 1253 group=None,
1254 1254 description=None, private=None,
1255 1255 clone_uri=None, landing_rev=None,
1256 1256 enable_statistics=None,
1257 1257 enable_downloads=None):
1258 1258 """
1259 1259 Updates repo
1260 1260
1261 1261 :param repoid: repository name or repository id
1262 1262 :type repoid: str or int
1263 1263 :param name:
1264 1264 :param owner:
1265 1265 :param group:
1266 1266 :param description:
1267 1267 :param private:
1268 1268 :param clone_uri:
1269 1269 :param landing_rev:
1270 1270 :param enable_statistics:
1271 1271 :param enable_downloads:
1272 1272 """
1273 1273 repo = get_repo_or_error(repoid)
1274 1274 if not HasPermissionAny('hg.admin')():
1275 1275 if not HasRepoPermissionLevel('admin')(repo.repo_name):
1276 1276 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1277 1277
1278 1278 if (name != repo.repo_name and repo.group_id is None and
1279 1279 not HasPermissionAny('hg.create.repository')()
1280 1280 ):
1281 1281 raise JSONRPCError('no permission to create (or move) top level repositories')
1282 1282
1283 1283 if owner is not None:
1284 1284 # forbid setting owner for non-admins
1285 1285 raise JSONRPCError(
1286 1286 'Only Kallithea admin can specify `owner` param'
1287 1287 )
1288 1288
1289 1289 updates = {}
1290 1290 repo_group = group
1291 1291 if repo_group is not None:
1292 1292 repo_group = get_repo_group_or_error(repo_group) # TODO: repos can thus currently not be moved to root
1293 1293 if repo_group.group_id != repo.group_id:
1294 1294 if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(repo_group.group_name)):
1295 1295 raise JSONRPCError("no permission to create (or move) repo in %s" % repo_group.group_name)
1296 1296 repo_group = repo_group.group_id
1297 1297 try:
1298 1298 store_update(updates, name, 'repo_name')
1299 1299 store_update(updates, repo_group, 'repo_group')
1300 1300 store_update(updates, owner, 'owner')
1301 1301 store_update(updates, description, 'repo_description')
1302 1302 store_update(updates, private, 'repo_private')
1303 1303 store_update(updates, clone_uri, 'clone_uri')
1304 1304 store_update(updates, landing_rev, 'repo_landing_rev')
1305 1305 store_update(updates, enable_statistics, 'repo_enable_statistics')
1306 1306 store_update(updates, enable_downloads, 'repo_enable_downloads')
1307 1307
1308 1308 RepoModel().update(repo, **updates)
1309 1309 meta.Session().commit()
1310 1310 return dict(
1311 1311 msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
1312 1312 repository=repo.get_api_data()
1313 1313 )
1314 1314 except Exception:
1315 1315 log.error(traceback.format_exc())
1316 1316 raise JSONRPCError('failed to update repo `%s`' % repoid)
1317 1317
1318 1318 # permission check inside
1319 1319 @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
1320 1320 def fork_repo(self, repoid, fork_name,
1321 1321 owner=None,
1322 1322 description='', copy_permissions=False,
1323 1323 private=False, landing_rev='rev:tip'):
1324 1324 """
1325 1325 Creates a fork of given repo. In case of using celery this will
1326 1326 immediately return success message, while fork is going to be created
1327 1327 asynchronous. This command can be executed only using api_key belonging to
1328 1328 user with admin rights or regular user that have fork permission, and at least
1329 1329 read access to forking repository. Regular users cannot specify owner parameter.
1330 1330
1331 1331 :param repoid: repository name or repository id
1332 1332 :type repoid: str or int
1333 1333 :param fork_name:
1334 1334 :param owner:
1335 1335 :param description:
1336 1336 :param copy_permissions:
1337 1337 :param private:
1338 1338 :param landing_rev:
1339 1339
1340 1340 INPUT::
1341 1341
1342 1342 id : <id_for_response>
1343 1343 api_key : "<api_key>"
1344 1344 method : "fork_repo"
1345 1345 args : {
1346 1346 "repoid" : "<reponame or repo_id>",
1347 1347 "fork_name" : "<forkname>",
1348 1348 "owner" : "<username or user_id = Optional(=apiuser)>",
1349 1349 "description" : "<description>",
1350 1350 "copy_permissions": "<bool>",
1351 1351 "private" : "<bool>",
1352 1352 "landing_rev" : "<landing_rev>"
1353 1353 }
1354 1354
1355 1355 OUTPUT::
1356 1356
1357 1357 id : <id_given_in_input>
1358 1358 result : {
1359 1359 "msg" : "Created fork of `<reponame>` as `<forkname>`",
1360 1360 "success" : true
1361 1361 }
1362 1362 error : null
1363 1363 """
1364 1364 repo = get_repo_or_error(repoid)
1365 1365 repo_name = repo.repo_name
1366 1366
1367 1367 _repo = RepoModel().get_by_repo_name(fork_name)
1368 1368 if _repo:
1369 1369 type_ = 'fork' if _repo.fork else 'repo'
1370 1370 raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
1371 1371
1372 1372 group_name = None
1373 1373 fork_name_parts = fork_name.split('/')
1374 1374 if len(fork_name_parts) > 1:
1375 1375 group_name = '/'.join(fork_name_parts[:-1])
1376 1376 repo_group = db.RepoGroup.get_by_group_name(group_name)
1377 1377 if repo_group is None:
1378 1378 raise JSONRPCError("repo group `%s` not found" % group_name)
1379 1379 if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(group_name)):
1380 1380 raise JSONRPCError("no permission to create repo in %s" % group_name)
1381 1381 else:
1382 1382 if not HasPermissionAny('hg.admin', 'hg.create.repository')():
1383 1383 raise JSONRPCError("no permission to create top level repo")
1384 1384
1385 1385 if HasPermissionAny('hg.admin')():
1386 1386 pass
1387 1387 elif HasRepoPermissionLevel('read')(repo.repo_name):
1388 1388 if owner is not None:
1389 1389 # forbid setting owner for non-admins
1390 1390 raise JSONRPCError(
1391 1391 'Only Kallithea admin can specify `owner` param'
1392 1392 )
1393 1393 else:
1394 1394 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1395 1395
1396 1396 if owner is None:
1397 1397 owner = request.authuser.user_id
1398 1398
1399 1399 owner = get_user_or_error(owner)
1400 1400
1401 1401 try:
1402 1402 form_data = dict(
1403 1403 repo_name=fork_name_parts[-1],
1404 1404 repo_name_full=fork_name,
1405 1405 repo_group=group_name,
1406 1406 repo_type=repo.repo_type,
1407 1407 description=description,
1408 1408 private=private,
1409 1409 copy_permissions=copy_permissions,
1410 1410 landing_rev=landing_rev,
1411 1411 update_after_clone=False,
1412 1412 fork_parent_id=repo.repo_id,
1413 1413 )
1414 1414 RepoModel().create_fork(form_data, cur_user=owner.username)
1415 1415 # no commit, it's done in RepoModel, or async via celery
1416 1416 return dict(
1417 1417 msg='Created fork of `%s` as `%s`' % (repo.repo_name,
1418 1418 fork_name),
1419 1419 success=True, # cannot return the repo data here since fork
1420 1420 # can be done async
1421 1421 )
1422 1422 except Exception:
1423 1423 log.error(traceback.format_exc())
1424 1424 raise JSONRPCError(
1425 1425 'failed to fork repository `%s` as `%s`' % (repo_name,
1426 1426 fork_name)
1427 1427 )
1428 1428
1429 1429 # permission check inside
1430 1430 def delete_repo(self, repoid, forks=''):
1431 1431 """
1432 1432 Deletes a repository. This command can be executed only using api_key belonging
1433 1433 to user with admin rights or regular user that have admin access to repository.
1434 1434 When `forks` param is set it's possible to detach or delete forks of deleting
1435 1435 repository
1436 1436
1437 1437 :param repoid: repository name or repository id
1438 1438 :type repoid: str or int
1439 1439 :param forks: `detach` or `delete`, what do do with attached forks for repo
1440 1440 :type forks: Optional(str)
1441 1441
1442 1442 OUTPUT::
1443 1443
1444 1444 id : <id_given_in_input>
1445 1445 result : {
1446 1446 "msg" : "Deleted repository `<reponame>`",
1447 1447 "success" : true
1448 1448 }
1449 1449 error : null
1450 1450 """
1451 1451 repo = get_repo_or_error(repoid)
1452 1452
1453 1453 if not HasPermissionAny('hg.admin')():
1454 1454 if not HasRepoPermissionLevel('admin')(repo.repo_name):
1455 1455 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1456 1456
1457 1457 try:
1458 1458 handle_forks = forks
1459 1459 _forks_msg = ''
1460 1460 _forks = [f for f in repo.forks]
1461 1461 if handle_forks == 'detach':
1462 1462 _forks_msg = ' ' + 'Detached %s forks' % len(_forks)
1463 1463 elif handle_forks == 'delete':
1464 1464 _forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
1465 1465 elif _forks:
1466 1466 raise JSONRPCError(
1467 1467 'Cannot delete `%s` it still contains attached forks' %
1468 1468 (repo.repo_name,)
1469 1469 )
1470 1470
1471 1471 RepoModel().delete(repo, forks=forks)
1472 1472 meta.Session().commit()
1473 1473 return dict(
1474 1474 msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
1475 1475 success=True
1476 1476 )
1477 1477 except Exception:
1478 1478 log.error(traceback.format_exc())
1479 1479 raise JSONRPCError(
1480 1480 'failed to delete repository `%s`' % (repo.repo_name,)
1481 1481 )
1482 1482
1483 1483 @HasPermissionAnyDecorator('hg.admin')
1484 1484 def grant_user_permission(self, repoid, userid, perm):
1485 1485 """
1486 1486 Grant permission for user on given repository, or update existing one
1487 1487 if found. This command can be executed only using api_key belonging to user
1488 1488 with admin rights.
1489 1489
1490 1490 :param repoid: repository name or repository id
1491 1491 :type repoid: str or int
1492 1492 :param userid:
1493 1493 :param perm: (repository.(none|read|write|admin))
1494 1494 :type perm: str
1495 1495
1496 1496 OUTPUT::
1497 1497
1498 1498 id : <id_given_in_input>
1499 1499 result : {
1500 1500 "msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
1501 1501 "success" : true
1502 1502 }
1503 1503 error : null
1504 1504 """
1505 1505 repo = get_repo_or_error(repoid)
1506 1506 user = get_user_or_error(userid)
1507 1507 perm = get_perm_or_error(perm)
1508 1508
1509 1509 try:
1510 1510
1511 1511 RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
1512 1512
1513 1513 meta.Session().commit()
1514 1514 return dict(
1515 1515 msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
1516 1516 perm.permission_name, user.username, repo.repo_name
1517 1517 ),
1518 1518 success=True
1519 1519 )
1520 1520 except Exception:
1521 1521 log.error(traceback.format_exc())
1522 1522 raise JSONRPCError(
1523 1523 'failed to edit permission for user: `%s` in repo: `%s`' % (
1524 1524 userid, repoid
1525 1525 )
1526 1526 )
1527 1527
1528 1528 @HasPermissionAnyDecorator('hg.admin')
1529 1529 def revoke_user_permission(self, repoid, userid):
1530 1530 """
1531 1531 Revoke permission for user on given repository. This command can be executed
1532 1532 only using api_key belonging to user with admin rights.
1533 1533
1534 1534 :param repoid: repository name or repository id
1535 1535 :type repoid: str or int
1536 1536 :param userid:
1537 1537
1538 1538 OUTPUT::
1539 1539
1540 1540 id : <id_given_in_input>
1541 1541 result : {
1542 1542 "msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
1543 1543 "success" : true
1544 1544 }
1545 1545 error : null
1546 1546 """
1547 1547 repo = get_repo_or_error(repoid)
1548 1548 user = get_user_or_error(userid)
1549 1549 try:
1550 1550 RepoModel().revoke_user_permission(repo=repo, user=user)
1551 1551 meta.Session().commit()
1552 1552 return dict(
1553 1553 msg='Revoked perm for user: `%s` in repo: `%s`' % (
1554 1554 user.username, repo.repo_name
1555 1555 ),
1556 1556 success=True
1557 1557 )
1558 1558 except Exception:
1559 1559 log.error(traceback.format_exc())
1560 1560 raise JSONRPCError(
1561 1561 'failed to edit permission for user: `%s` in repo: `%s`' % (
1562 1562 userid, repoid
1563 1563 )
1564 1564 )
1565 1565
1566 1566 # permission check inside
1567 1567 def grant_user_group_permission(self, repoid, usergroupid, perm):
1568 1568 """
1569 1569 Grant permission for user group on given repository, or update
1570 1570 existing one if found. This command can be executed only using
1571 1571 api_key belonging to user with admin rights.
1572 1572
1573 1573 :param repoid: repository name or repository id
1574 1574 :type repoid: str or int
1575 1575 :param usergroupid: id of usergroup
1576 1576 :type usergroupid: str or int
1577 1577 :param perm: (repository.(none|read|write|admin))
1578 1578 :type perm: str
1579 1579
1580 1580 OUTPUT::
1581 1581
1582 1582 id : <id_given_in_input>
1583 1583 result : {
1584 1584 "msg" : "Granted perm: `<perm>` for group: `<usersgroupname>` in repo: `<reponame>`",
1585 1585 "success" : true
1586 1586 }
1587 1587 error : null
1588 1588
1589 1589 ERROR OUTPUT::
1590 1590
1591 1591 id : <id_given_in_input>
1592 1592 result : null
1593 1593 error : {
1594 1594 "failed to edit permission for user group: `<usergroup>` in repo `<repo>`'
1595 1595 }
1596 1596 """
1597 1597 repo = get_repo_or_error(repoid)
1598 1598 perm = get_perm_or_error(perm)
1599 1599 user_group = get_user_group_or_error(usergroupid)
1600 1600 if not HasPermissionAny('hg.admin')():
1601 1601 if not HasRepoPermissionLevel('admin')(repo.repo_name):
1602 1602 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1603 1603
1604 1604 if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
1605 1605 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
1606 1606
1607 1607 try:
1608 1608 RepoModel().grant_user_group_permission(
1609 1609 repo=repo, group_name=user_group, perm=perm)
1610 1610
1611 1611 meta.Session().commit()
1612 1612 return dict(
1613 1613 msg='Granted perm: `%s` for user group: `%s` in '
1614 1614 'repo: `%s`' % (
1615 1615 perm.permission_name, user_group.users_group_name,
1616 1616 repo.repo_name
1617 1617 ),
1618 1618 success=True
1619 1619 )
1620 1620 except Exception:
1621 1621 log.error(traceback.format_exc())
1622 1622 raise JSONRPCError(
1623 1623 'failed to edit permission for user group: `%s` in '
1624 1624 'repo: `%s`' % (
1625 1625 usergroupid, repo.repo_name
1626 1626 )
1627 1627 )
1628 1628
1629 1629 # permission check inside
1630 1630 def revoke_user_group_permission(self, repoid, usergroupid):
1631 1631 """
1632 1632 Revoke permission for user group on given repository. This command can be
1633 1633 executed only using api_key belonging to user with admin rights.
1634 1634
1635 1635 :param repoid: repository name or repository id
1636 1636 :type repoid: str or int
1637 1637 :param usergroupid:
1638 1638
1639 1639 OUTPUT::
1640 1640
1641 1641 id : <id_given_in_input>
1642 1642 result : {
1643 1643 "msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
1644 1644 "success" : true
1645 1645 }
1646 1646 error : null
1647 1647 """
1648 1648 repo = get_repo_or_error(repoid)
1649 1649 user_group = get_user_group_or_error(usergroupid)
1650 1650 if not HasPermissionAny('hg.admin')():
1651 1651 if not HasRepoPermissionLevel('admin')(repo.repo_name):
1652 1652 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
1653 1653
1654 1654 if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
1655 1655 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
1656 1656
1657 1657 try:
1658 1658 RepoModel().revoke_user_group_permission(
1659 1659 repo=repo, group_name=user_group)
1660 1660
1661 1661 meta.Session().commit()
1662 1662 return dict(
1663 1663 msg='Revoked perm for user group: `%s` in repo: `%s`' % (
1664 1664 user_group.users_group_name, repo.repo_name
1665 1665 ),
1666 1666 success=True
1667 1667 )
1668 1668 except Exception:
1669 1669 log.error(traceback.format_exc())
1670 1670 raise JSONRPCError(
1671 1671 'failed to edit permission for user group: `%s` in '
1672 1672 'repo: `%s`' % (
1673 1673 user_group.users_group_name, repo.repo_name
1674 1674 )
1675 1675 )
1676 1676
1677 1677 @HasPermissionAnyDecorator('hg.admin')
1678 1678 def get_repo_group(self, repogroupid):
1679 1679 """
1680 1680 Returns given repo group together with permissions, and repositories
1681 1681 inside the group
1682 1682
1683 1683 :param repogroupid: id/name of repository group
1684 1684 :type repogroupid: str or int
1685 1685 """
1686 1686 repo_group = get_repo_group_or_error(repogroupid)
1687 1687
1688 1688 members = []
1689 1689 for user in repo_group.repo_group_to_perm:
1690 1690 perm = user.permission.permission_name
1691 1691 user = user.user
1692 1692 user_data = {
1693 1693 'name': user.username,
1694 1694 'type': "user",
1695 1695 'permission': perm
1696 1696 }
1697 1697 members.append(user_data)
1698 1698
1699 1699 for user_group in repo_group.users_group_to_perm:
1700 1700 perm = user_group.permission.permission_name
1701 1701 user_group = user_group.users_group
1702 1702 user_group_data = {
1703 1703 'name': user_group.users_group_name,
1704 1704 'type': "user_group",
1705 1705 'permission': perm
1706 1706 }
1707 1707 members.append(user_group_data)
1708 1708
1709 1709 data = repo_group.get_api_data()
1710 1710 data["members"] = members
1711 1711 return data
1712 1712
1713 1713 @HasPermissionAnyDecorator('hg.admin')
1714 1714 def get_repo_groups(self):
1715 1715 """
1716 1716 Returns all repository groups
1717 1717 """
1718 1718 return [
1719 1719 repo_group.get_api_data()
1720 1720 for repo_group in db.RepoGroup.query()
1721 1721 ]
1722 1722
1723 1723 @HasPermissionAnyDecorator('hg.admin')
1724 1724 def create_repo_group(self, group_name, description='',
1725 1725 owner=None,
1726 1726 parent=None,
1727 1727 copy_permissions=False):
1728 1728 """
1729 1729 Creates a repository group. This command can be executed only using
1730 1730 api_key belonging to user with admin rights.
1731 1731
1732 1732 :param group_name:
1733 1733 :type group_name:
1734 1734 :param description:
1735 1735 :type description:
1736 1736 :param owner:
1737 1737 :type owner:
1738 1738 :param parent:
1739 1739 :type parent:
1740 1740 :param copy_permissions:
1741 1741 :type copy_permissions:
1742 1742
1743 1743 OUTPUT::
1744 1744
1745 1745 id : <id_given_in_input>
1746 1746 result : {
1747 1747 "msg" : "created new repo group `<repo_group_name>`",
1748 1748 "repo_group" : <repogroup_object>
1749 1749 }
1750 1750 error : null
1751 1751
1752 1752 ERROR OUTPUT::
1753 1753
1754 1754 id : <id_given_in_input>
1755 1755 result : null
1756 1756 error : {
1757 1757 failed to create repo group `<repogroupid>`
1758 1758 }
1759 1759 """
1760 1760 if db.RepoGroup.get_by_group_name(group_name):
1761 1761 raise JSONRPCError("repo group `%s` already exist" % (group_name,))
1762 1762
1763 1763 if owner is None:
1764 1764 owner = request.authuser.user_id
1765 1765 group_description = description
1766 1766 parent_group = None
1767 1767 if parent is not None:
1768 1768 parent_group = get_repo_group_or_error(parent)
1769 1769
1770 1770 try:
1771 1771 repo_group = RepoGroupModel().create(
1772 1772 group_name=group_name,
1773 1773 group_description=group_description,
1774 1774 owner=owner,
1775 1775 parent=parent_group,
1776 1776 copy_permissions=copy_permissions
1777 1777 )
1778 1778 meta.Session().commit()
1779 1779 return dict(
1780 1780 msg='created new repo group `%s`' % group_name,
1781 1781 repo_group=repo_group.get_api_data()
1782 1782 )
1783 1783 except Exception:
1784 1784
1785 1785 log.error(traceback.format_exc())
1786 1786 raise JSONRPCError('failed to create repo group `%s`' % (group_name,))
1787 1787
1788 1788 @HasPermissionAnyDecorator('hg.admin')
1789 1789 def update_repo_group(self, repogroupid, group_name=None,
1790 1790 description=None,
1791 1791 owner=None,
1792 1792 parent=None):
1793 1793 repo_group = get_repo_group_or_error(repogroupid)
1794 parent_repo_group_id = None if parent is None else get_repo_group_or_error(parent).group_id
1794 1795
1795 1796 updates = {}
1796 1797 try:
1797 1798 store_update(updates, group_name, 'group_name')
1798 1799 store_update(updates, description, 'group_description')
1799 1800 store_update(updates, owner, 'owner')
1800 store_update(updates, parent, 'parent_group')
1801 store_update(updates, parent_repo_group_id, 'parent_group_id')
1801 1802 repo_group = RepoGroupModel().update(repo_group, updates)
1802 1803 meta.Session().commit()
1803 1804 return dict(
1804 1805 msg='updated repository group ID:%s %s' % (repo_group.group_id,
1805 1806 repo_group.group_name),
1806 1807 repo_group=repo_group.get_api_data()
1807 1808 )
1808 1809 except Exception:
1809 1810 log.error(traceback.format_exc())
1810 1811 raise JSONRPCError('failed to update repository group `%s`'
1811 1812 % (repogroupid,))
1812 1813
1813 1814 @HasPermissionAnyDecorator('hg.admin')
1814 1815 def delete_repo_group(self, repogroupid):
1815 1816 """
1816 1817 :param repogroupid: name or id of repository group
1817 1818 :type repogroupid: str or int
1818 1819
1819 1820 OUTPUT::
1820 1821
1821 1822 id : <id_given_in_input>
1822 1823 result : {
1823 1824 'msg' : 'deleted repo group ID:<repogroupid> <repogroupname>
1824 1825 'repo_group' : null
1825 1826 }
1826 1827 error : null
1827 1828
1828 1829 ERROR OUTPUT::
1829 1830
1830 1831 id : <id_given_in_input>
1831 1832 result : null
1832 1833 error : {
1833 1834 "failed to delete repo group ID:<repogroupid> <repogroupname>"
1834 1835 }
1835 1836 """
1836 1837 repo_group = get_repo_group_or_error(repogroupid)
1837 1838
1838 1839 try:
1839 1840 RepoGroupModel().delete(repo_group)
1840 1841 meta.Session().commit()
1841 1842 return dict(
1842 1843 msg='deleted repo group ID:%s %s' %
1843 1844 (repo_group.group_id, repo_group.group_name),
1844 1845 repo_group=None
1845 1846 )
1846 1847 except Exception:
1847 1848 log.error(traceback.format_exc())
1848 1849 raise JSONRPCError('failed to delete repo group ID:%s %s' %
1849 1850 (repo_group.group_id, repo_group.group_name)
1850 1851 )
1851 1852
1852 1853 # permission check inside
1853 1854 def grant_user_permission_to_repo_group(self, repogroupid, userid,
1854 1855 perm, apply_to_children='none'):
1855 1856 """
1856 1857 Grant permission for user on given repository group, or update existing
1857 1858 one if found. This command can be executed only using api_key belonging
1858 1859 to user with admin rights, or user who has admin right to given repository
1859 1860 group.
1860 1861
1861 1862 :param repogroupid: name or id of repository group
1862 1863 :type repogroupid: str or int
1863 1864 :param userid:
1864 1865 :param perm: (group.(none|read|write|admin))
1865 1866 :type perm: str
1866 1867 :param apply_to_children: 'none', 'repos', 'groups', 'all'
1867 1868 :type apply_to_children: str
1868 1869
1869 1870 OUTPUT::
1870 1871
1871 1872 id : <id_given_in_input>
1872 1873 result : {
1873 1874 "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
1874 1875 "success" : true
1875 1876 }
1876 1877 error : null
1877 1878
1878 1879 ERROR OUTPUT::
1879 1880
1880 1881 id : <id_given_in_input>
1881 1882 result : null
1882 1883 error : {
1883 1884 "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
1884 1885 }
1885 1886 """
1886 1887 repo_group = get_repo_group_or_error(repogroupid)
1887 1888
1888 1889 if not HasPermissionAny('hg.admin')():
1889 1890 if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
1890 1891 raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
1891 1892
1892 1893 user = get_user_or_error(userid)
1893 1894 perm = get_perm_or_error(perm, prefix='group.')
1894 1895
1895 1896 try:
1896 1897 RepoGroupModel().add_permission(repo_group=repo_group,
1897 1898 obj=user,
1898 1899 obj_type="user",
1899 1900 perm=perm,
1900 1901 recursive=apply_to_children)
1901 1902 meta.Session().commit()
1902 1903 return dict(
1903 1904 msg='Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1904 1905 perm.permission_name, apply_to_children, user.username, repo_group.name
1905 1906 ),
1906 1907 success=True
1907 1908 )
1908 1909 except Exception:
1909 1910 log.error(traceback.format_exc())
1910 1911 raise JSONRPCError(
1911 1912 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1912 1913 userid, repo_group.name))
1913 1914
1914 1915 # permission check inside
1915 1916 def revoke_user_permission_from_repo_group(self, repogroupid, userid,
1916 1917 apply_to_children='none'):
1917 1918 """
1918 1919 Revoke permission for user on given repository group. This command can
1919 1920 be executed only using api_key belonging to user with admin rights, or
1920 1921 user who has admin right to given repository group.
1921 1922
1922 1923 :param repogroupid: name or id of repository group
1923 1924 :type repogroupid: str or int
1924 1925 :param userid:
1925 1926 :type userid:
1926 1927 :param apply_to_children: 'none', 'repos', 'groups', 'all'
1927 1928 :type apply_to_children: str
1928 1929
1929 1930 OUTPUT::
1930 1931
1931 1932 id : <id_given_in_input>
1932 1933 result : {
1933 1934 "msg" : "Revoked perm (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
1934 1935 "success" : true
1935 1936 }
1936 1937 error : null
1937 1938
1938 1939 ERROR OUTPUT::
1939 1940
1940 1941 id : <id_given_in_input>
1941 1942 result : null
1942 1943 error : {
1943 1944 "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
1944 1945 }
1945 1946 """
1946 1947 repo_group = get_repo_group_or_error(repogroupid)
1947 1948
1948 1949 if not HasPermissionAny('hg.admin')():
1949 1950 if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
1950 1951 raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
1951 1952
1952 1953 user = get_user_or_error(userid)
1953 1954
1954 1955 try:
1955 1956 RepoGroupModel().delete_permission(repo_group=repo_group,
1956 1957 obj=user,
1957 1958 obj_type="user",
1958 1959 recursive=apply_to_children)
1959 1960
1960 1961 meta.Session().commit()
1961 1962 return dict(
1962 1963 msg='Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
1963 1964 apply_to_children, user.username, repo_group.name
1964 1965 ),
1965 1966 success=True
1966 1967 )
1967 1968 except Exception:
1968 1969 log.error(traceback.format_exc())
1969 1970 raise JSONRPCError(
1970 1971 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1971 1972 userid, repo_group.name))
1972 1973
1973 1974 # permission check inside
1974 1975 def grant_user_group_permission_to_repo_group(
1975 1976 self, repogroupid, usergroupid, perm,
1976 1977 apply_to_children='none'):
1977 1978 """
1978 1979 Grant permission for user group on given repository group, or update
1979 1980 existing one if found. This command can be executed only using
1980 1981 api_key belonging to user with admin rights, or user who has admin
1981 1982 right to given repository group.
1982 1983
1983 1984 :param repogroupid: name or id of repository group
1984 1985 :type repogroupid: str or int
1985 1986 :param usergroupid: id of usergroup
1986 1987 :type usergroupid: str or int
1987 1988 :param perm: (group.(none|read|write|admin))
1988 1989 :type perm: str
1989 1990 :param apply_to_children: 'none', 'repos', 'groups', 'all'
1990 1991 :type apply_to_children: str
1991 1992
1992 1993 OUTPUT::
1993 1994
1994 1995 id : <id_given_in_input>
1995 1996 result : {
1996 1997 "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
1997 1998 "success" : true
1998 1999 }
1999 2000 error : null
2000 2001
2001 2002 ERROR OUTPUT::
2002 2003
2003 2004 id : <id_given_in_input>
2004 2005 result : null
2005 2006 error : {
2006 2007 "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
2007 2008 }
2008 2009 """
2009 2010 repo_group = get_repo_group_or_error(repogroupid)
2010 2011 perm = get_perm_or_error(perm, prefix='group.')
2011 2012 user_group = get_user_group_or_error(usergroupid)
2012 2013 if not HasPermissionAny('hg.admin')():
2013 2014 if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
2014 2015 raise JSONRPCError(
2015 2016 'repository group `%s` does not exist' % (repogroupid,))
2016 2017
2017 2018 if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
2018 2019 raise JSONRPCError(
2019 2020 'user group `%s` does not exist' % (usergroupid,))
2020 2021
2021 2022 try:
2022 2023 RepoGroupModel().add_permission(repo_group=repo_group,
2023 2024 obj=user_group,
2024 2025 obj_type="user_group",
2025 2026 perm=perm,
2026 2027 recursive=apply_to_children)
2027 2028 meta.Session().commit()
2028 2029 return dict(
2029 2030 msg='Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2030 2031 perm.permission_name, apply_to_children,
2031 2032 user_group.users_group_name, repo_group.name
2032 2033 ),
2033 2034 success=True
2034 2035 )
2035 2036 except Exception:
2036 2037 log.error(traceback.format_exc())
2037 2038 raise JSONRPCError(
2038 2039 'failed to edit permission for user group: `%s` in '
2039 2040 'repo group: `%s`' % (
2040 2041 usergroupid, repo_group.name
2041 2042 )
2042 2043 )
2043 2044
2044 2045 # permission check inside
2045 2046 def revoke_user_group_permission_from_repo_group(
2046 2047 self, repogroupid, usergroupid,
2047 2048 apply_to_children='none'):
2048 2049 """
2049 2050 Revoke permission for user group on given repository. This command can be
2050 2051 executed only using api_key belonging to user with admin rights, or
2051 2052 user who has admin right to given repository group.
2052 2053
2053 2054 :param repogroupid: name or id of repository group
2054 2055 :type repogroupid: str or int
2055 2056 :param usergroupid:
2056 2057 :param apply_to_children: 'none', 'repos', 'groups', 'all'
2057 2058 :type apply_to_children: str
2058 2059
2059 2060 OUTPUT::
2060 2061
2061 2062 id : <id_given_in_input>
2062 2063 result : {
2063 2064 "msg" : "Revoked perm (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
2064 2065 "success" : true
2065 2066 }
2066 2067 error : null
2067 2068
2068 2069 ERROR OUTPUT::
2069 2070
2070 2071 id : <id_given_in_input>
2071 2072 result : null
2072 2073 error : {
2073 2074 "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
2074 2075 }
2075 2076 """
2076 2077 repo_group = get_repo_group_or_error(repogroupid)
2077 2078 user_group = get_user_group_or_error(usergroupid)
2078 2079 if not HasPermissionAny('hg.admin')():
2079 2080 if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
2080 2081 raise JSONRPCError(
2081 2082 'repository group `%s` does not exist' % (repogroupid,))
2082 2083
2083 2084 if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
2084 2085 raise JSONRPCError(
2085 2086 'user group `%s` does not exist' % (usergroupid,))
2086 2087
2087 2088 try:
2088 2089 RepoGroupModel().delete_permission(repo_group=repo_group,
2089 2090 obj=user_group,
2090 2091 obj_type="user_group",
2091 2092 recursive=apply_to_children)
2092 2093 meta.Session().commit()
2093 2094 return dict(
2094 2095 msg='Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2095 2096 apply_to_children, user_group.users_group_name, repo_group.name
2096 2097 ),
2097 2098 success=True
2098 2099 )
2099 2100 except Exception:
2100 2101 log.error(traceback.format_exc())
2101 2102 raise JSONRPCError(
2102 2103 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2103 2104 user_group.users_group_name, repo_group.name
2104 2105 )
2105 2106 )
2106 2107
2107 2108 def get_gist(self, gistid):
2108 2109 """
2109 2110 Get given gist by id
2110 2111
2111 2112 :param gistid: id of private or public gist
2112 2113 :type gistid: str
2113 2114 """
2114 2115 gist = get_gist_or_error(gistid)
2115 2116 if not HasPermissionAny('hg.admin')():
2116 2117 if gist.owner_id != request.authuser.user_id:
2117 2118 raise JSONRPCError('gist `%s` does not exist' % (gistid,))
2118 2119 return gist.get_api_data()
2119 2120
2120 2121 def get_gists(self, userid=None):
2121 2122 """
2122 2123 Get all gists for given user. If userid is empty returned gists
2123 2124 are for user who called the api
2124 2125
2125 2126 :param userid: user to get gists for
2126 2127 :type userid: Optional(str or int)
2127 2128 """
2128 2129 if not HasPermissionAny('hg.admin')():
2129 2130 # make sure normal user does not pass someone else userid,
2130 2131 # he is not allowed to do that
2131 2132 if userid is not None and userid != request.authuser.user_id:
2132 2133 raise JSONRPCError(
2133 2134 'userid is not the same as your user'
2134 2135 )
2135 2136
2136 2137 if userid is None:
2137 2138 user_id = request.authuser.user_id
2138 2139 else:
2139 2140 user_id = get_user_or_error(userid).user_id
2140 2141
2141 2142 return [
2142 2143 gist.get_api_data()
2143 2144 for gist in db.Gist().query()
2144 2145 .filter_by(is_expired=False)
2145 2146 .filter(db.Gist.owner_id == user_id)
2146 2147 .order_by(db.Gist.created_on.desc())
2147 2148 ]
2148 2149
2149 2150 def create_gist(self, files, owner=None,
2150 2151 gist_type=db.Gist.GIST_PUBLIC, lifetime=-1,
2151 2152 description=''):
2152 2153 """
2153 2154 Creates new Gist
2154 2155
2155 2156 :param files: files to be added to gist
2156 2157 {'filename': {'content':'...', 'lexer': null},
2157 2158 'filename2': {'content':'...', 'lexer': null}}
2158 2159 :type files: dict
2159 2160 :param owner: gist owner, defaults to api method caller
2160 2161 :type owner: Optional(str or int)
2161 2162 :param gist_type: type of gist 'public' or 'private'
2162 2163 :type gist_type: Optional(str)
2163 2164 :param lifetime: time in minutes of gist lifetime
2164 2165 :type lifetime: Optional(int)
2165 2166 :param description: gist description
2166 2167 :type description: str
2167 2168
2168 2169 OUTPUT::
2169 2170
2170 2171 id : <id_given_in_input>
2171 2172 result : {
2172 2173 "msg" : "created new gist",
2173 2174 "gist" : <gist_object>
2174 2175 }
2175 2176 error : null
2176 2177
2177 2178 ERROR OUTPUT::
2178 2179
2179 2180 id : <id_given_in_input>
2180 2181 result : null
2181 2182 error : {
2182 2183 "failed to create gist"
2183 2184 }
2184 2185 """
2185 2186 try:
2186 2187 if owner is None:
2187 2188 owner = request.authuser.user_id
2188 2189
2189 2190 owner = get_user_or_error(owner)
2190 2191
2191 2192 gist = GistModel().create(description=description,
2192 2193 owner=owner,
2193 2194 ip_addr=request.ip_addr,
2194 2195 gist_mapping=files,
2195 2196 gist_type=gist_type,
2196 2197 lifetime=lifetime)
2197 2198 meta.Session().commit()
2198 2199 return dict(
2199 2200 msg='created new gist',
2200 2201 gist=gist.get_api_data()
2201 2202 )
2202 2203 except Exception:
2203 2204 log.error(traceback.format_exc())
2204 2205 raise JSONRPCError('failed to create gist')
2205 2206
2206 2207 # permission check inside
2207 2208 def delete_gist(self, gistid):
2208 2209 """
2209 2210 Deletes existing gist
2210 2211
2211 2212 :param gistid: id of gist to delete
2212 2213 :type gistid: str
2213 2214
2214 2215 OUTPUT::
2215 2216
2216 2217 id : <id_given_in_input>
2217 2218 result : {
2218 2219 "msg" : "deleted gist ID: <gist_id>",
2219 2220 "gist" : null
2220 2221 }
2221 2222 error : null
2222 2223
2223 2224 ERROR OUTPUT::
2224 2225
2225 2226 id : <id_given_in_input>
2226 2227 result : null
2227 2228 error : {
2228 2229 "failed to delete gist ID:<gist_id>"
2229 2230 }
2230 2231 """
2231 2232 gist = get_gist_or_error(gistid)
2232 2233 if not HasPermissionAny('hg.admin')():
2233 2234 if gist.owner_id != request.authuser.user_id:
2234 2235 raise JSONRPCError('gist `%s` does not exist' % (gistid,))
2235 2236
2236 2237 try:
2237 2238 GistModel().delete(gist)
2238 2239 meta.Session().commit()
2239 2240 return dict(
2240 2241 msg='deleted gist ID:%s' % (gist.gist_access_id,),
2241 2242 gist=None
2242 2243 )
2243 2244 except Exception:
2244 2245 log.error(traceback.format_exc())
2245 2246 raise JSONRPCError('failed to delete gist ID:%s'
2246 2247 % (gist.gist_access_id,))
2247 2248
2248 2249 # permission check inside
2249 2250 def get_changesets(self, repoid, start=None, end=None, start_date=None,
2250 2251 end_date=None, branch_name=None, reverse=False, with_file_list=False, max_revisions=None):
2251 2252 repo = get_repo_or_error(repoid)
2252 2253 if not HasRepoPermissionLevel('read')(repo.repo_name):
2253 2254 raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
2254 2255
2255 2256 format = "%Y-%m-%dT%H:%M:%S"
2256 2257 try:
2257 2258 return [e.__json__(with_file_list) for e in
2258 2259 repo.scm_instance.get_changesets(start,
2259 2260 end,
2260 2261 datetime.strptime(start_date, format) if start_date else None,
2261 2262 datetime.strptime(end_date, format) if end_date else None,
2262 2263 branch_name,
2263 2264 reverse, max_revisions)]
2264 2265 except EmptyRepositoryError as e:
2265 2266 raise JSONRPCError('Repository is empty')
2266 2267
2267 2268 # permission check inside
2268 2269 def get_changeset(self, repoid, raw_id, with_reviews=False):
2269 2270 repo = get_repo_or_error(repoid)
2270 2271 if not HasRepoPermissionLevel('read')(repo.repo_name):
2271 2272 raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
2272 2273 changeset = repo.get_changeset(raw_id)
2273 2274 if isinstance(changeset, EmptyChangeset):
2274 2275 raise JSONRPCError('Changeset %s does not exist' % raw_id)
2275 2276
2276 2277 info = dict(changeset.as_dict())
2277 2278
2278 2279 if with_reviews:
2279 2280 reviews = ChangesetStatusModel().get_statuses(
2280 2281 repo.repo_name, raw_id)
2281 2282 info["reviews"] = reviews
2282 2283
2283 2284 return info
2284 2285
2285 2286 # permission check inside
2286 2287 def get_pullrequest(self, pullrequest_id):
2287 2288 """
2288 2289 Get given pull request by id
2289 2290 """
2290 2291 pull_request = db.PullRequest.get(pullrequest_id)
2291 2292 if pull_request is None:
2292 2293 raise JSONRPCError('pull request `%s` does not exist' % (pullrequest_id,))
2293 2294 if not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name):
2294 2295 raise JSONRPCError('not allowed')
2295 2296 return pull_request.get_api_data()
2296 2297
2297 2298 # permission check inside
2298 2299 def comment_pullrequest(self, pull_request_id, comment_msg='', status=None, close_pr=False):
2299 2300 """
2300 2301 Add comment, close and change status of pull request.
2301 2302 """
2302 2303 apiuser = get_user_or_error(request.authuser.user_id)
2303 2304 pull_request = db.PullRequest.get(pull_request_id)
2304 2305 if pull_request is None:
2305 2306 raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
2306 2307 if (not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name)):
2307 2308 raise JSONRPCError('No permission to add comment. User needs at least reading permissions'
2308 2309 ' to the source repository.')
2309 2310 owner = apiuser.user_id == pull_request.owner_id
2310 2311 reviewer = apiuser.user_id in [reviewer.user_id for reviewer in pull_request.reviewers]
2311 2312 if close_pr and not (apiuser.admin or owner):
2312 2313 raise JSONRPCError('No permission to close pull request. User needs to be admin or owner.')
2313 2314 if status and not (apiuser.admin or owner or reviewer):
2314 2315 raise JSONRPCError('No permission to change pull request status. User needs to be admin, owner or reviewer.')
2315 2316 if pull_request.is_closed():
2316 2317 raise JSONRPCError('pull request is already closed')
2317 2318
2318 2319 comment = ChangesetCommentsModel().create(
2319 2320 text=comment_msg,
2320 2321 repo=pull_request.org_repo.repo_id,
2321 2322 author=apiuser.user_id,
2322 2323 pull_request=pull_request.pull_request_id,
2323 2324 f_path=None,
2324 2325 line_no=None,
2325 2326 status_change=db.ChangesetStatus.get_status_lbl(status),
2326 2327 closing_pr=close_pr
2327 2328 )
2328 2329 userlog.action_logger(apiuser,
2329 2330 'user_commented_pull_request:%s' % pull_request_id,
2330 2331 pull_request.org_repo, request.ip_addr)
2331 2332 if status:
2332 2333 ChangesetStatusModel().set_status(
2333 2334 pull_request.org_repo_id,
2334 2335 status,
2335 2336 apiuser.user_id,
2336 2337 comment,
2337 2338 pull_request=pull_request_id
2338 2339 )
2339 2340 if close_pr:
2340 2341 PullRequestModel().close_pull_request(pull_request_id)
2341 2342 userlog.action_logger(apiuser,
2342 2343 'user_closed_pull_request:%s' % pull_request_id,
2343 2344 pull_request.org_repo, request.ip_addr)
2344 2345 meta.Session().commit()
2345 2346 return True
2346 2347
2347 2348 # permission check inside
2348 2349 def edit_reviewers(self, pull_request_id, add=None, remove=None):
2349 2350 """
2350 2351 Add and/or remove one or more reviewers to a pull request, by username
2351 2352 or user ID. Reviewers are specified either as a single-user string or
2352 2353 as a JSON list of one or more strings.
2353 2354 """
2354 2355 if add is None and remove is None:
2355 2356 raise JSONRPCError('''Invalid request. Neither 'add' nor 'remove' is specified.''')
2356 2357
2357 2358 pull_request = db.PullRequest.get(pull_request_id)
2358 2359 if pull_request is None:
2359 2360 raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
2360 2361
2361 2362 apiuser = get_user_or_error(request.authuser.user_id)
2362 2363 is_owner = apiuser.user_id == pull_request.owner_id
2363 2364 is_repo_admin = HasRepoPermissionLevel('admin')(pull_request.other_repo.repo_name)
2364 2365 if not (apiuser.admin or is_repo_admin or is_owner):
2365 2366 raise JSONRPCError('No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.')
2366 2367 if pull_request.is_closed():
2367 2368 raise JSONRPCError('Cannot edit reviewers of a closed pull request.')
2368 2369
2369 2370 if not isinstance(add, list):
2370 2371 add = [add]
2371 2372 if not isinstance(remove, list):
2372 2373 remove = [remove]
2373 2374
2374 2375 # look up actual user objects from given name or id. Bail out if unknown.
2375 2376 add_objs = set(get_user_or_error(user) for user in add if user is not None)
2376 2377 remove_objs = set(get_user_or_error(user) for user in remove if user is not None)
2377 2378
2378 2379 new_reviewers = redundant_reviewers = set()
2379 2380 if add_objs:
2380 2381 new_reviewers, redundant_reviewers = PullRequestModel().add_reviewers(apiuser, pull_request, add_objs)
2381 2382 if remove_objs:
2382 2383 PullRequestModel().remove_reviewers(apiuser, pull_request, remove_objs)
2383 2384
2384 2385 meta.Session().commit()
2385 2386
2386 2387 return {
2387 2388 'added': [x.username for x in new_reviewers],
2388 2389 'already_present': [x.username for x in redundant_reviewers],
2389 2390 # NOTE: no explicit check that removed reviewers were actually present.
2390 2391 'removed': [x.username for x in remove_objs],
2391 2392 }
@@ -1,224 +1,225 b''
1 1 ## -*- coding: utf-8 -*-
2 2 <%inherit file="/base/base.html"/>
3 3 <%block name="title">
4 4 ${_('About')}
5 5 </%block>
6 6 <%block name="header_menu">
7 7 ${self.menu('about')}
8 8 </%block>
9 9 <%def name="main()">
10 10
11 11 <div class="panel panel-primary">
12 12 <div class="panel-heading">
13 13 <h5 class="panel-title">${_('About')} Kallithea</h5>
14 14 </div>
15 15
16 16 <div class="panel-body panel-about">
17 17 <p><a href="https://kallithea-scm.org/">Kallithea</a> is a project of the
18 18 <a href="http://sfconservancy.org/">Software Freedom Conservancy, Inc.</a>
19 19 and is released under the terms of the
20 20 <a href="http://www.gnu.org/copyleft/gpl.html">GNU General Public License,
21 21 v 3.0 (GPLv3)</a>.</p>
22 22
23 23 <p>Kallithea is copyrighted by various authors, including but not
24 24 necessarily limited to the following:</p>
25 25 <ul>
26 26
27 27 <li>Copyright &copy; 2012&ndash;2022, Mads Kiilerich</li>
28 28 <li>Copyright &copy; 2019&ndash;2020, 2022, Manuel Jacob</li>
29 <li>Copyright &copy; 2022, toras9000</li>
29 30 <li>Copyright &copy; 2014&ndash;2021, Thomas De Schampheleire</li>
30 31 <li>Copyright &copy; 2015&ndash;2017, 2019&ndash;2021, Γ‰tienne Gilli</li>
31 32 <li>Copyright &copy; 2018&ndash;2021, ssantos</li>
32 33 <li>Copyright &copy; 2019&ndash;2021, Private</li>
33 34 <li>Copyright &copy; 2020&ndash;2021, fresh</li>
34 35 <li>Copyright &copy; 2020&ndash;2021, robertus</li>
35 36 <li>Copyright &copy; 2021, Eugenia Russell</li>
36 37 <li>Copyright &copy; 2021, Michalis</li>
37 38 <li>Copyright &copy; 2021, vs</li>
38 39 <li>Copyright &copy; 2021, АлСксандр</li>
39 40 <li>Copyright &copy; 2016&ndash;2017, 2020, Asterios Dimitriou</li>
40 41 <li>Copyright &copy; 2017&ndash;2020, Allan NordhΓΈy</li>
41 42 <li>Copyright &copy; 2017, 2020, Anton Schur</li>
42 43 <li>Copyright &copy; 2020, Artem</li>
43 44 <li>Copyright &copy; 2020, David Ignjić</li>
44 45 <li>Copyright &copy; 2020, Dennis Fink</li>
45 46 <li>Copyright &copy; 2020, J. Lavoie</li>
46 47 <li>Copyright &copy; 2020, Ross Thomas</li>
47 48 <li>Copyright &copy; 2020, Tim Ooms</li>
48 49 <li>Copyright &copy; 2012, 2014&ndash;2017, 2019, Andrej Shadura</li>
49 50 <li>Copyright &copy; 2019, Adi Kriegisch</li>
50 51 <li>Copyright &copy; 2019, Danni Randeris</li>
51 52 <li>Copyright &copy; 2019, Edmund Wong</li>
52 53 <li>Copyright &copy; 2019, Elizabeth Sherrock</li>
53 54 <li>Copyright &copy; 2019, Hüseyin Tunç</li>
54 55 <li>Copyright &copy; 2019, leela</li>
55 56 <li>Copyright &copy; 2019, Mateusz Mendel</li>
56 57 <li>Copyright &copy; 2019, Nathan</li>
57 58 <li>Copyright &copy; 2019, Oleksandr Shtalinberg</li>
58 59 <li>Copyright &copy; 2019, THANOS SIOURDAKIS</li>
59 60 <li>Copyright &copy; 2019, Wolfgang Scherer</li>
60 61 <li>Copyright &copy; 2019, Π₯ристо Π‘Ρ‚Π°Π½Π΅Π²</li>
61 62 <li>Copyright &copy; 2012, 2014&ndash;2018, Dominik Ruf</li>
62 63 <li>Copyright &copy; 2014&ndash;2015, 2018, Michal ČihaΕ™</li>
63 64 <li>Copyright &copy; 2015, 2018, Branko Majic</li>
64 65 <li>Copyright &copy; 2018, Chris Rule</li>
65 66 <li>Copyright &copy; 2018, JesΓΊs SΓ‘nchez</li>
66 67 <li>Copyright &copy; 2018, Patrick Vane</li>
67 68 <li>Copyright &copy; 2018, Pheng Heong Tan</li>
68 69 <li>Copyright &copy; 2018, Максим Π―ΠΊΠΈΠΌΡ‡ΡƒΠΊ</li>
69 70 <li>Copyright &copy; 2018, ΠœΠ°Ρ€Ρ Π―ΠΌΠ±Π°Ρ€</li>
70 71 <li>Copyright &copy; 2012&ndash;2017, Unity Technologies</li>
71 72 <li>Copyright &copy; 2015&ndash;2017, SΓΈren LΓΈvborg</li>
72 73 <li>Copyright &copy; 2015, 2017, Sam Jaques</li>
73 74 <li>Copyright &copy; 2017, Alessandro Molina</li>
74 75 <li>Copyright &copy; 2017, Ching-Chen Mao</li>
75 76 <li>Copyright &copy; 2017, Eivind Tagseth</li>
76 77 <li>Copyright &copy; 2017, FUJIWARA Katsunori</li>
77 78 <li>Copyright &copy; 2017, Holger Schramm</li>
78 79 <li>Copyright &copy; 2017, Karl Goetz</li>
79 80 <li>Copyright &copy; 2017, Lars Kruse</li>
80 81 <li>Copyright &copy; 2017, Marko Semet</li>
81 82 <li>Copyright &copy; 2017, Viktar Vauchkevich</li>
82 83 <li>Copyright &copy; 2012&ndash;2016, Takumi IINO</li>
83 84 <li>Copyright &copy; 2015&ndash;2016, Jan Heylen</li>
84 85 <li>Copyright &copy; 2015&ndash;2016, Robert Martinez</li>
85 86 <li>Copyright &copy; 2015&ndash;2016, Robert Rauch</li>
86 87 <li>Copyright &copy; 2016, Angel Ezquerra</li>
87 88 <li>Copyright &copy; 2016, Anton Shestakov</li>
88 89 <li>Copyright &copy; 2016, Brandon Jones</li>
89 90 <li>Copyright &copy; 2016, Kateryna Musina</li>
90 91 <li>Copyright &copy; 2016, Konstantin Veretennicov</li>
91 92 <li>Copyright &copy; 2016, Oscar Curero</li>
92 93 <li>Copyright &copy; 2016, Robert James Dennington</li>
93 94 <li>Copyright &copy; 2016, timeless@gmail.com</li>
94 95 <li>Copyright &copy; 2016, YFdyh000</li>
95 96 <li>Copyright &copy; 2012&ndash;2013, 2015, Aras Pranckevičius</li>
96 97 <li>Copyright &copy; 2014&ndash;2015, Bradley M. Kuhn</li>
97 98 <li>Copyright &copy; 2014&ndash;2015, Christian Oyarzun</li>
98 99 <li>Copyright &copy; 2014&ndash;2015, Joseph Rivera</li>
99 100 <li>Copyright &copy; 2014&ndash;2015, Sean Farley</li>
100 101 <li>Copyright &copy; 2015, Anatoly Bubenkov</li>
101 102 <li>Copyright &copy; 2015, Andrew Bartlett</li>
102 103 <li>Copyright &copy; 2015, BalÑzs Úr</li>
103 104 <li>Copyright &copy; 2015, Ben Finney</li>
104 105 <li>Copyright &copy; 2015, Daniel Hobley</li>
105 106 <li>Copyright &copy; 2015, David Avigni</li>
106 107 <li>Copyright &copy; 2015, Denis Blanchette</li>
107 108 <li>Copyright &copy; 2015, duanhongyi</li>
108 109 <li>Copyright &copy; 2015, EriCSN Chang</li>
109 110 <li>Copyright &copy; 2015, Grzegorz Krason</li>
110 111 <li>Copyright &copy; 2015, JiΕ™Γ­ Suchan</li>
111 112 <li>Copyright &copy; 2015, Kazunari Kobayashi</li>
112 113 <li>Copyright &copy; 2015, Kevin Bullock</li>
113 114 <li>Copyright &copy; 2015, kobanari</li>
114 115 <li>Copyright &copy; 2015, Marc Abramowitz</li>
115 116 <li>Copyright &copy; 2015, Marc Villetard</li>
116 117 <li>Copyright &copy; 2015, Matthias Zilk</li>
117 118 <li>Copyright &copy; 2015, Michael Pohl</li>
118 119 <li>Copyright &copy; 2015, Michael V. DePalatis</li>
119 120 <li>Copyright &copy; 2015, Morten Skaaning</li>
120 121 <li>Copyright &copy; 2015, Nick High</li>
121 122 <li>Copyright &copy; 2015, Niemand Jedermann</li>
122 123 <li>Copyright &copy; 2015, Peter Vitt</li>
123 124 <li>Copyright &copy; 2015, Ronny Pfannschmidt</li>
124 125 <li>Copyright &copy; 2015, Tuux</li>
125 126 <li>Copyright &copy; 2015, Viktar Palstsiuk</li>
126 127 <li>Copyright &copy; 2014, Ante Ilic</li>
127 128 <li>Copyright &copy; 2014, Calinou</li>
128 129 <li>Copyright &copy; 2014, Daniel Anderson</li>
129 130 <li>Copyright &copy; 2014, Henrik Stuart</li>
130 131 <li>Copyright &copy; 2014, Ingo von Borstel</li>
131 132 <li>Copyright &copy; 2014, invision70</li>
132 133 <li>Copyright &copy; 2014, Jelmer VernooΔ³</li>
133 134 <li>Copyright &copy; 2014, Jim Hague</li>
134 135 <li>Copyright &copy; 2014, Matt Fellows</li>
135 136 <li>Copyright &copy; 2014, Max Roman</li>
136 137 <li>Copyright &copy; 2014, Na'Tosha Bard</li>
137 138 <li>Copyright &copy; 2014, Rasmus Selsmark</li>
138 139 <li>Copyright &copy; 2014, SkryabinD</li>
139 140 <li>Copyright &copy; 2014, Tim Freund</li>
140 141 <li>Copyright &copy; 2014, Travis Burtrum</li>
141 142 <li>Copyright &copy; 2014, whosaysni</li>
142 143 <li>Copyright &copy; 2014, Zoltan Gyarmati</li>
143 144 <li>Copyright &copy; 2010&ndash;2013, Marcin KuΕΊmiΕ„ski</li>
144 145 <li>Copyright &copy; 2010&ndash;2013, RhodeCode GmbH</li>
145 146 <li>Copyright &copy; 2011, 2013, Aparkar</li>
146 147 <li>Copyright &copy; 2012&ndash;2013, Nemcio</li>
147 148 <li>Copyright &copy; 2012&ndash;2013, xpol</li>
148 149 <li>Copyright &copy; 2013, Andrey Mivrenik</li>
149 150 <li>Copyright &copy; 2013, ArcheR</li>
150 151 <li>Copyright &copy; 2013, Dennis Brakhane</li>
151 152 <li>Copyright &copy; 2013, gnustavo</li>
152 153 <li>Copyright &copy; 2013, Grzegorz RoΕΌniecki</li>
153 154 <li>Copyright &copy; 2013, Ilya Beda</li>
154 155 <li>Copyright &copy; 2013, ivlevdenis</li>
155 156 <li>Copyright &copy; 2013, Jonathan Sternberg</li>
156 157 <li>Copyright &copy; 2013, Leonardo Carneiro</li>
157 158 <li>Copyright &copy; 2013, Magnus Ericmats</li>
158 159 <li>Copyright &copy; 2013, Martin Vium</li>
159 160 <li>Copyright &copy; 2013, Mikhail Zholobov</li>
160 161 <li>Copyright &copy; 2013, mokeev1995</li>
161 162 <li>Copyright &copy; 2013, Ruslan Bekenev</li>
162 163 <li>Copyright &copy; 2013, shirou - しろう</li>
163 164 <li>Copyright &copy; 2013, Simon Lopez</li>
164 165 <li>Copyright &copy; 2013, softforwinxp</li>
165 166 <li>Copyright &copy; 2013, stephanj</li>
166 167 <li>Copyright &copy; 2013, zhmylove</li>
167 168 <li>Copyright &copy; 2013, こいんとす</li>
168 169 <li>Copyright &copy; 2011&ndash;2012, Augusto Herrmann</li>
169 170 <li>Copyright &copy; 2012, Dan Sheridan</li>
170 171 <li>Copyright &copy; 2012, H Waldo G</li>
171 172 <li>Copyright &copy; 2012, hppj</li>
172 173 <li>Copyright &copy; 2012, Indra Talip</li>
173 174 <li>Copyright &copy; 2012, mikespook</li>
174 175 <li>Copyright &copy; 2012, nansenat16</li>
175 176 <li>Copyright &copy; 2012, Philip Jameson</li>
176 177 <li>Copyright &copy; 2012, Raoul Thill</li>
177 178 <li>Copyright &copy; 2012, Tony Bussieres</li>
178 179 <li>Copyright &copy; 2012, Vincent Duvert</li>
179 180 <li>Copyright &copy; 2012, Vladislav Poluhin</li>
180 181 <li>Copyright &copy; 2012, Zachary Auclair</li>
181 182 <li>Copyright &copy; 2011, Ankit Solanki</li>
182 183 <li>Copyright &copy; 2011, Dmitri Kuznetsov</li>
183 184 <li>Copyright &copy; 2011, Jared Bunting</li>
184 185 <li>Copyright &copy; 2011, Jason Harris</li>
185 186 <li>Copyright &copy; 2011, Les Peabody</li>
186 187 <li>Copyright &copy; 2011, Liad Shani</li>
187 188 <li>Copyright &copy; 2011, Lorenzo M. Catucci</li>
188 189 <li>Copyright &copy; 2011, Matt Zuba</li>
189 190 <li>Copyright &copy; 2011, Nicolas VINOT</li>
190 191 <li>Copyright &copy; 2011, Shawn K. O'Shea</li>
191 192 <li>Copyright &copy; 2010, Łukasz Balcerzak</li>
192 193
193 194 ## We did not list the following copyright holders, given that they appeared
194 195 ## to use for-profit company affiliations in their contribution in the
195 196 ## Mercurial log and therefore I didn't know if copyright was theirs or
196 197 ## their company's.
197 198 ## Copyright &copy; 2011 Thayne Harbaugh <thayne@fusionio.com>
198 199 ## Copyright &copy; 2012 Dies Koper <diesk@fast.au.fujitsu.com>
199 200 ## Copyright &copy; 2012 Erwin Kroon <e.kroon@smartmetersolutions.nl>
200 201 ## Copyright &copy; 2012 Vincent Caron <vcaron@bearstech.com>
201 202 ##
202 203 ## These contributors' contributions may not be copyrightable:
203 204 ## philip.j@hostdime.com in 2012
204 205 ## Stefan Engel <mail@engel-stefan.de> in 2012
205 206 ## Ton Plomp <tcplomp@gmail.com> in 2013
206 207 ##
207 208 </ul>
208 209
209 210 <p>The above are the copyright holders who have submitted direct
210 211 contributions to the Kallithea repository.</p>
211 212
212 213 <p>In the <a href="https://kallithea-scm.org/repos/kallithea">Kallithea
213 214 source code</a>, there is a
214 215 <a href="https://kallithea-scm.org/repos/kallithea/files/tip/LICENSE.md">list
215 216 of third-party libraries and code that Kallithea incorporates</a>.</p>
216 217
217 218 <p>The front-end contains a <a href="${h.url('/LICENSES.txt')}">list of
218 219 software that is used to build the front-end</a> but isn't distributed as a
219 220 part of Kallithea.</p>
220 221
221 222 </div>
222 223 </div>
223 224
224 225 </%def>
@@ -1,2921 +1,2921 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14
15 15 """
16 16 Tests for the JSON-RPC web api.
17 17 """
18 18
19 19 import datetime
20 20 import os
21 21 import random
22 22 import re
23 23 import string
24 24 from typing import Sized
25 25
26 26 import mock
27 27 import pytest
28 28 from webtest import TestApp
29 29
30 30 from kallithea.lib import ext_json
31 31 from kallithea.lib.auth import AuthUser
32 32 from kallithea.lib.utils2 import ascii_bytes
33 33 from kallithea.model import db, meta
34 34 from kallithea.model.changeset_status import ChangesetStatusModel
35 35 from kallithea.model.gist import GistModel
36 36 from kallithea.model.pull_request import PullRequestModel
37 37 from kallithea.model.repo import RepoModel
38 38 from kallithea.model.repo_group import RepoGroupModel
39 39 from kallithea.model.scm import ScmModel
40 40 from kallithea.model.user import UserModel
41 41 from kallithea.model.user_group import UserGroupModel
42 42 from kallithea.tests import base
43 43 from kallithea.tests.fixture import Fixture, raise_exception
44 44
45 45
46 46 API_URL = '/_admin/api'
47 47 TEST_USER_GROUP = 'test_user_group'
48 48 TEST_REPO_GROUP = 'test_repo_group'
49 49
50 50 fixture = Fixture()
51 51
52 52
53 53 def _build_data(apikey, method, **kw):
54 54 """
55 55 Builds API data with given random ID
56 56 For convenience, the json is returned as str
57 57 """
58 58 random_id = random.randrange(1, 9999)
59 59 return random_id, ext_json.dumps({
60 60 "id": random_id,
61 61 "api_key": apikey,
62 62 "method": method,
63 63 "args": kw
64 64 })
65 65
66 66
67 67 jsonify = lambda obj: ext_json.loads(ext_json.dumps(obj))
68 68
69 69
70 70 def api_call(test_obj, params):
71 71 response = test_obj.app.post(API_URL, content_type='application/json',
72 72 params=params)
73 73 return response
74 74
75 75
76 76 ## helpers
77 77 def make_user_group(name=TEST_USER_GROUP):
78 78 gr = fixture.create_user_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
79 79 UserGroupModel().add_user_to_group(user_group=gr,
80 80 user=base.TEST_USER_ADMIN_LOGIN)
81 81 meta.Session().commit()
82 82 return gr
83 83
84 84
85 85 def make_repo_group(name=TEST_REPO_GROUP):
86 86 gr = fixture.create_repo_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
87 87 meta.Session().commit()
88 88 return gr
89 89
90 90
91 91 class _BaseTestApi(object):
92 92 app: TestApp # assigned by app_fixture in subclass TestController mixin
93 93 # assigned in subclass:
94 94 REPO: str
95 95 REPO_TYPE: str
96 96 TEST_REVISION: str
97 97 TEST_PR_SRC: str
98 98 TEST_PR_DST: str
99 99 TEST_PR_REVISIONS: Sized
100 100
101 101 @classmethod
102 102 def setup_class(cls):
103 103 cls.usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
104 104 cls.apikey = cls.usr.api_key
105 105 cls.test_user = UserModel().create_or_update(
106 106 username='test-api',
107 107 password='test',
108 108 email='test@example.com',
109 109 firstname='first',
110 110 lastname='last'
111 111 )
112 112 meta.Session().commit()
113 113 cls.TEST_USER_LOGIN = cls.test_user.username
114 114 cls.apikey_regular = cls.test_user.api_key
115 115
116 116 @classmethod
117 117 def teardown_class(cls):
118 118 pass
119 119
120 120 def setup_method(self, method):
121 121 make_user_group()
122 122 make_repo_group()
123 123
124 124 def teardown_method(self, method):
125 125 fixture.destroy_user_group(TEST_USER_GROUP)
126 126 fixture.destroy_gists()
127 127 fixture.destroy_repo_group(TEST_REPO_GROUP)
128 128
129 129 def _compare_ok(self, id_, expected, given):
130 130 expected = jsonify({
131 131 'id': id_,
132 132 'error': None,
133 133 'result': expected
134 134 })
135 135 given = ext_json.loads(given)
136 136 assert expected == given, (expected, given)
137 137
138 138 def _compare_error(self, id_, expected, given):
139 139 expected = jsonify({
140 140 'id': id_,
141 141 'error': expected,
142 142 'result': None
143 143 })
144 144 given = ext_json.loads(given)
145 145 assert expected == given, (expected, given)
146 146
147 147 def test_api_wrong_key(self):
148 148 id_, params = _build_data('trololo', 'get_user')
149 149 response = api_call(self, params)
150 150
151 151 expected = 'Invalid API key'
152 152 self._compare_error(id_, expected, given=response.body)
153 153
154 154 def test_api_missing_non_optional_param(self):
155 155 id_, params = _build_data(self.apikey, 'get_repo')
156 156 response = api_call(self, params)
157 157
158 158 expected = 'Missing non optional `repoid` arg in JSON DATA'
159 159 self._compare_error(id_, expected, given=response.body)
160 160
161 161 def test_api_missing_non_optional_param_args_null(self):
162 162 id_, params = _build_data(self.apikey, 'get_repo')
163 163 params = params.replace('"args": {}', '"args": null')
164 164 response = api_call(self, params)
165 165
166 166 expected = 'Missing non optional `repoid` arg in JSON DATA'
167 167 self._compare_error(id_, expected, given=response.body)
168 168
169 169 def test_api_missing_non_optional_param_args_bad(self):
170 170 id_, params = _build_data(self.apikey, 'get_repo')
171 171 params = params.replace('"args": {}', '"args": 1')
172 172 response = api_call(self, params)
173 173
174 174 expected = 'Missing non optional `repoid` arg in JSON DATA'
175 175 self._compare_error(id_, expected, given=response.body)
176 176
177 177 def test_api_args_is_null(self):
178 178 id_, params = _build_data(self.apikey, 'get_users', )
179 179 params = params.replace('"args": {}', '"args": null')
180 180 response = api_call(self, params)
181 181 assert response.status == '200 OK'
182 182
183 183 def test_api_args_is_bad(self):
184 184 id_, params = _build_data(self.apikey, 'get_users', )
185 185 params = params.replace('"args": {}', '"args": 1')
186 186 response = api_call(self, params)
187 187 assert response.status == '200 OK'
188 188
189 189 def test_api_args_different_args(self):
190 190 expected = {
191 191 'ascii_letters': string.ascii_letters,
192 192 'ws': string.whitespace,
193 193 'printables': string.printable
194 194 }
195 195 id_, params = _build_data(self.apikey, 'test', args=expected)
196 196 response = api_call(self, params)
197 197 assert response.status == '200 OK'
198 198 self._compare_ok(id_, expected, response.body)
199 199
200 200 def test_api_get_users(self):
201 201 id_, params = _build_data(self.apikey, 'get_users', )
202 202 response = api_call(self, params)
203 203 ret_all = []
204 204 _users = db.User.query().filter_by(is_default_user=False) \
205 205 .order_by(db.User.username).all()
206 206 for usr in _users:
207 207 ret = usr.get_api_data()
208 208 ret_all.append(jsonify(ret))
209 209 expected = ret_all
210 210 self._compare_ok(id_, expected, given=response.body)
211 211
212 212 def test_api_get_user(self):
213 213 id_, params = _build_data(self.apikey, 'get_user',
214 214 userid=base.TEST_USER_ADMIN_LOGIN)
215 215 response = api_call(self, params)
216 216
217 217 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
218 218 ret = usr.get_api_data()
219 219 ret['permissions'] = AuthUser(dbuser=usr).permissions
220 220
221 221 expected = ret
222 222 self._compare_ok(id_, expected, given=response.body)
223 223
224 224 def test_api_get_user_that_does_not_exist(self):
225 225 id_, params = _build_data(self.apikey, 'get_user',
226 226 userid='trololo')
227 227 response = api_call(self, params)
228 228
229 229 expected = "user `%s` does not exist" % 'trololo'
230 230 self._compare_error(id_, expected, given=response.body)
231 231
232 232 def test_api_get_user_without_giving_userid(self):
233 233 id_, params = _build_data(self.apikey, 'get_user')
234 234 response = api_call(self, params)
235 235
236 236 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
237 237 ret = usr.get_api_data()
238 238 ret['permissions'] = AuthUser(dbuser=usr).permissions
239 239
240 240 expected = ret
241 241 self._compare_ok(id_, expected, given=response.body)
242 242
243 243 def test_api_get_user_without_giving_userid_non_admin(self):
244 244 id_, params = _build_data(self.apikey_regular, 'get_user')
245 245 response = api_call(self, params)
246 246
247 247 usr = db.User.get_by_username(self.TEST_USER_LOGIN)
248 248 ret = usr.get_api_data()
249 249 ret['permissions'] = AuthUser(dbuser=usr).permissions
250 250
251 251 expected = ret
252 252 self._compare_ok(id_, expected, given=response.body)
253 253
254 254 def test_api_get_user_with_giving_userid_non_admin(self):
255 255 id_, params = _build_data(self.apikey_regular, 'get_user',
256 256 userid=self.TEST_USER_LOGIN)
257 257 response = api_call(self, params)
258 258
259 259 expected = 'userid is not the same as your user'
260 260 self._compare_error(id_, expected, given=response.body)
261 261
262 262 def test_api_pull_remote(self):
263 263 # Note: pulling from local repos is a misfeature - it will bypass access control
264 264 # ... but ok, if the path already has been set in the database
265 265 repo_name = 'test_pull'
266 266 r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
267 267 # hack around that clone_uri can't be set to to a local path
268 268 # (as shown by test_api_create_repo_clone_uri_local)
269 269 r.clone_uri = os.path.join(db.Ui.get_by_key('paths', '/').ui_value, self.REPO)
270 270 meta.Session().commit()
271 271
272 272 pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == repo_name)]
273 273
274 274 id_, params = _build_data(self.apikey, 'pull',
275 275 repoid=repo_name,)
276 276 response = api_call(self, params)
277 277
278 278 expected = {'msg': 'Pulled from `%s`' % repo_name,
279 279 'repository': repo_name}
280 280 self._compare_ok(id_, expected, given=response.body)
281 281
282 282 post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == repo_name)]
283 283
284 284 fixture.destroy_repo(repo_name)
285 285
286 286 assert pre_cached_tip != post_cached_tip
287 287
288 288 def test_api_pull_fork(self):
289 289 fork_name = 'fork'
290 290 fixture.create_fork(self.REPO, fork_name)
291 291 id_, params = _build_data(self.apikey, 'pull',
292 292 repoid=fork_name,)
293 293 response = api_call(self, params)
294 294
295 295 expected = {'msg': 'Pulled from `%s`' % fork_name,
296 296 'repository': fork_name}
297 297 self._compare_ok(id_, expected, given=response.body)
298 298
299 299 fixture.destroy_repo(fork_name)
300 300
301 301 def test_api_pull_error_no_remote_no_fork(self):
302 302 # should fail because no clone_uri is set
303 303 id_, params = _build_data(self.apikey, 'pull',
304 304 repoid=self.REPO, )
305 305 response = api_call(self, params)
306 306
307 307 expected = 'Unable to pull changes from `%s`' % self.REPO
308 308 self._compare_error(id_, expected, given=response.body)
309 309
310 310 def test_api_pull_custom_remote(self):
311 311 repo_name = 'test_pull_custom_remote'
312 312 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
313 313
314 314 custom_remote_path = os.path.join(db.Ui.get_by_key('paths', '/').ui_value, self.REPO)
315 315
316 316 id_, params = _build_data(self.apikey, 'pull',
317 317 repoid=repo_name,
318 318 clone_uri=custom_remote_path)
319 319 response = api_call(self, params)
320 320
321 321 expected = {'msg': 'Pulled from `%s`' % repo_name,
322 322 'repository': repo_name}
323 323 self._compare_ok(id_, expected, given=response.body)
324 324
325 325 fixture.destroy_repo(repo_name)
326 326
327 327 def test_api_rescan_repos(self):
328 328 id_, params = _build_data(self.apikey, 'rescan_repos')
329 329 response = api_call(self, params)
330 330
331 331 expected = {'added': [], 'removed': []}
332 332 self._compare_ok(id_, expected, given=response.body)
333 333
334 334 @mock.patch.object(ScmModel, 'repo_scan', raise_exception)
335 335 def test_api_rescann_error(self):
336 336 id_, params = _build_data(self.apikey, 'rescan_repos', )
337 337 response = api_call(self, params)
338 338
339 339 expected = 'Error occurred during rescan repositories action'
340 340 self._compare_error(id_, expected, given=response.body)
341 341
342 342 def test_api_create_existing_user(self):
343 343 id_, params = _build_data(self.apikey, 'create_user',
344 344 username=base.TEST_USER_ADMIN_LOGIN,
345 345 email='test@example.com',
346 346 password='trololo')
347 347 response = api_call(self, params)
348 348
349 349 expected = "user `%s` already exist" % base.TEST_USER_ADMIN_LOGIN
350 350 self._compare_error(id_, expected, given=response.body)
351 351
352 352 def test_api_create_user_with_existing_email(self):
353 353 id_, params = _build_data(self.apikey, 'create_user',
354 354 username=base.TEST_USER_ADMIN_LOGIN + 'new',
355 355 email=base.TEST_USER_REGULAR_EMAIL,
356 356 password='trololo')
357 357 response = api_call(self, params)
358 358
359 359 expected = "email `%s` already exist" % base.TEST_USER_REGULAR_EMAIL
360 360 self._compare_error(id_, expected, given=response.body)
361 361
362 362 def test_api_create_user(self):
363 363 username = 'test_new_api_user'
364 364 email = username + "@example.com"
365 365
366 366 id_, params = _build_data(self.apikey, 'create_user',
367 367 username=username,
368 368 email=email,
369 369 password='trololo')
370 370 response = api_call(self, params)
371 371
372 372 usr = db.User.get_by_username(username)
373 373 ret = dict(
374 374 msg='created new user `%s`' % username,
375 375 user=jsonify(usr.get_api_data())
376 376 )
377 377
378 378 try:
379 379 expected = ret
380 380 self._compare_ok(id_, expected, given=response.body)
381 381 finally:
382 382 fixture.destroy_user(usr.user_id)
383 383
384 384 def test_api_create_user_without_password(self):
385 385 username = 'test_new_api_user_passwordless'
386 386 email = username + "@example.com"
387 387
388 388 id_, params = _build_data(self.apikey, 'create_user',
389 389 username=username,
390 390 email=email)
391 391 response = api_call(self, params)
392 392
393 393 usr = db.User.get_by_username(username)
394 394 ret = dict(
395 395 msg='created new user `%s`' % username,
396 396 user=jsonify(usr.get_api_data())
397 397 )
398 398 try:
399 399 expected = ret
400 400 self._compare_ok(id_, expected, given=response.body)
401 401 finally:
402 402 fixture.destroy_user(usr.user_id)
403 403
404 404 def test_api_create_user_with_extern_name(self):
405 405 username = 'test_new_api_user_passwordless'
406 406 email = username + "@example.com"
407 407
408 408 id_, params = _build_data(self.apikey, 'create_user',
409 409 username=username,
410 410 email=email, extern_name='internal')
411 411 response = api_call(self, params)
412 412
413 413 usr = db.User.get_by_username(username)
414 414 ret = dict(
415 415 msg='created new user `%s`' % username,
416 416 user=jsonify(usr.get_api_data())
417 417 )
418 418 try:
419 419 expected = ret
420 420 self._compare_ok(id_, expected, given=response.body)
421 421 finally:
422 422 fixture.destroy_user(usr.user_id)
423 423
424 424 @mock.patch.object(UserModel, 'create_or_update', raise_exception)
425 425 def test_api_create_user_when_exception_happened(self):
426 426
427 427 username = 'test_new_api_user'
428 428 email = username + "@example.com"
429 429
430 430 id_, params = _build_data(self.apikey, 'create_user',
431 431 username=username,
432 432 email=email,
433 433 password='trololo')
434 434 response = api_call(self, params)
435 435 expected = 'failed to create user `%s`' % username
436 436 self._compare_error(id_, expected, given=response.body)
437 437
438 438 def test_api_delete_user(self):
439 439 usr = UserModel().create_or_update(username='test_user',
440 440 password='qweqwe',
441 441 email='u232@example.com',
442 442 firstname='u1', lastname='u1')
443 443 meta.Session().commit()
444 444 username = usr.username
445 445 email = usr.email
446 446 usr_id = usr.user_id
447 447 ## DELETE THIS USER NOW
448 448
449 449 id_, params = _build_data(self.apikey, 'delete_user',
450 450 userid=username, )
451 451 response = api_call(self, params)
452 452
453 453 ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
454 454 'user': None}
455 455 expected = ret
456 456 self._compare_ok(id_, expected, given=response.body)
457 457
458 458 @mock.patch.object(UserModel, 'delete', raise_exception)
459 459 def test_api_delete_user_when_exception_happened(self):
460 460 usr = UserModel().create_or_update(username='test_user',
461 461 password='qweqwe',
462 462 email='u232@example.com',
463 463 firstname='u1', lastname='u1')
464 464 meta.Session().commit()
465 465 username = usr.username
466 466
467 467 id_, params = _build_data(self.apikey, 'delete_user',
468 468 userid=username, )
469 469 response = api_call(self, params)
470 470 ret = 'failed to delete user ID:%s %s' % (usr.user_id,
471 471 usr.username)
472 472 expected = ret
473 473 self._compare_error(id_, expected, given=response.body)
474 474
475 475 @base.parametrize('name,expected', [
476 476 ('firstname', 'new_username'),
477 477 ('lastname', 'new_username'),
478 478 ('email', 'new_username'),
479 479 ('admin', True),
480 480 ('admin', False),
481 481 ('extern_type', 'ldap'),
482 482 ('extern_type', None),
483 483 ('extern_name', 'test'),
484 484 ('extern_name', None),
485 485 ('active', False),
486 486 ('active', True),
487 487 ('password', 'newpass'),
488 488 ])
489 489 def test_api_update_user(self, name, expected):
490 490 usr = db.User.get_by_username(self.TEST_USER_LOGIN)
491 491 kw = {name: expected,
492 492 'userid': usr.user_id}
493 493 id_, params = _build_data(self.apikey, 'update_user', **kw)
494 494 response = api_call(self, params)
495 495
496 496 ret = {
497 497 'msg': 'updated user ID:%s %s' % (
498 498 usr.user_id, self.TEST_USER_LOGIN),
499 499 'user': jsonify(db.User \
500 500 .get_by_username(self.TEST_USER_LOGIN) \
501 501 .get_api_data())
502 502 }
503 503
504 504 expected = ret
505 505 self._compare_ok(id_, expected, given=response.body)
506 506
507 507 def test_api_update_user_no_changed_params(self):
508 508 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
509 509 ret = jsonify(usr.get_api_data())
510 510 id_, params = _build_data(self.apikey, 'update_user',
511 511 userid=base.TEST_USER_ADMIN_LOGIN)
512 512
513 513 response = api_call(self, params)
514 514 ret = {
515 515 'msg': 'updated user ID:%s %s' % (
516 516 usr.user_id, base.TEST_USER_ADMIN_LOGIN),
517 517 'user': ret
518 518 }
519 519 expected = ret
520 520 self._compare_ok(id_, expected, given=response.body)
521 521
522 522 def test_api_update_user_by_user_id(self):
523 523 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
524 524 ret = jsonify(usr.get_api_data())
525 525 id_, params = _build_data(self.apikey, 'update_user',
526 526 userid=usr.user_id)
527 527
528 528 response = api_call(self, params)
529 529 ret = {
530 530 'msg': 'updated user ID:%s %s' % (
531 531 usr.user_id, base.TEST_USER_ADMIN_LOGIN),
532 532 'user': ret
533 533 }
534 534 expected = ret
535 535 self._compare_ok(id_, expected, given=response.body)
536 536
537 537 def test_api_update_user_default_user(self):
538 538 usr = db.User.get_default_user()
539 539 id_, params = _build_data(self.apikey, 'update_user',
540 540 userid=usr.user_id)
541 541
542 542 response = api_call(self, params)
543 543 expected = 'editing default user is forbidden'
544 544 self._compare_error(id_, expected, given=response.body)
545 545
546 546 @mock.patch.object(UserModel, 'update_user', raise_exception)
547 547 def test_api_update_user_when_exception_happens(self):
548 548 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
549 549 ret = jsonify(usr.get_api_data())
550 550 id_, params = _build_data(self.apikey, 'update_user',
551 551 userid=usr.user_id)
552 552
553 553 response = api_call(self, params)
554 554 ret = 'failed to update user `%s`' % usr.user_id
555 555
556 556 expected = ret
557 557 self._compare_error(id_, expected, given=response.body)
558 558
559 559 def test_api_get_repo(self):
560 560 new_group = 'some_new_group'
561 561 make_user_group(new_group)
562 562 RepoModel().grant_user_group_permission(repo=self.REPO,
563 563 group_name=new_group,
564 564 perm='repository.read')
565 565 meta.Session().commit()
566 566 id_, params = _build_data(self.apikey, 'get_repo',
567 567 repoid=self.REPO)
568 568 response = api_call(self, params)
569 569 assert "tags" not in response.json['result']
570 570 assert 'pull_requests' not in response.json['result']
571 571
572 572 repo = RepoModel().get_by_repo_name(self.REPO)
573 573 ret = repo.get_api_data()
574 574
575 575 members = []
576 576 followers = []
577 577 for user in repo.repo_to_perm:
578 578 perm = user.permission.permission_name
579 579 user = user.user
580 580 user_data = {'name': user.username, 'type': "user",
581 581 'permission': perm}
582 582 members.append(user_data)
583 583
584 584 for user_group in repo.users_group_to_perm:
585 585 perm = user_group.permission.permission_name
586 586 user_group = user_group.users_group
587 587 user_group_data = {'name': user_group.users_group_name,
588 588 'type': "user_group", 'permission': perm}
589 589 members.append(user_group_data)
590 590
591 591 for user in repo.followers:
592 592 followers.append(user.user.get_api_data())
593 593
594 594 ret['members'] = members
595 595 ret['followers'] = followers
596 596
597 597 expected = ret
598 598 self._compare_ok(id_, expected, given=response.body)
599 599 fixture.destroy_user_group(new_group)
600 600
601 601 id_, params = _build_data(self.apikey, 'get_repo', repoid=self.REPO,
602 602 with_revision_names=True,
603 603 with_pullrequests=True)
604 604 response = api_call(self, params)
605 605 assert "v0.2.0" in response.json['result']['tags']
606 606 assert 'pull_requests' in response.json['result']
607 607
608 608 @base.parametrize('grant_perm', [
609 609 ('repository.admin'),
610 610 ('repository.write'),
611 611 ('repository.read'),
612 612 ])
613 613 def test_api_get_repo_by_non_admin(self, grant_perm):
614 614 RepoModel().grant_user_permission(repo=self.REPO,
615 615 user=self.TEST_USER_LOGIN,
616 616 perm=grant_perm)
617 617 meta.Session().commit()
618 618 id_, params = _build_data(self.apikey_regular, 'get_repo',
619 619 repoid=self.REPO)
620 620 response = api_call(self, params)
621 621
622 622 repo = RepoModel().get_by_repo_name(self.REPO)
623 623 assert len(repo.repo_to_perm) >= 2 # make sure we actually are testing something - probably the default 2 permissions, possibly more
624 624
625 625 expected = repo.get_api_data()
626 626
627 627 members = []
628 628 for user in repo.repo_to_perm:
629 629 perm = user.permission.permission_name
630 630 user_obj = user.user
631 631 user_data = {'name': user_obj.username, 'type': "user",
632 632 'permission': perm}
633 633 members.append(user_data)
634 634 for user_group in repo.users_group_to_perm:
635 635 perm = user_group.permission.permission_name
636 636 user_group_obj = user_group.users_group
637 637 user_group_data = {'name': user_group_obj.users_group_name,
638 638 'type': "user_group", 'permission': perm}
639 639 members.append(user_group_data)
640 640 expected['members'] = members
641 641
642 642 followers = []
643 643
644 644 for user in repo.followers:
645 645 followers.append(user.user.get_api_data())
646 646
647 647 expected['followers'] = followers
648 648
649 649 try:
650 650 self._compare_ok(id_, expected, given=response.body)
651 651 finally:
652 652 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
653 653
654 654 def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
655 655 RepoModel().grant_user_permission(repo=self.REPO,
656 656 user=db.User.DEFAULT_USER_NAME,
657 657 perm='repository.none')
658 658 try:
659 659 RepoModel().grant_user_permission(repo=self.REPO,
660 660 user=self.TEST_USER_LOGIN,
661 661 perm='repository.none')
662 662
663 663 id_, params = _build_data(self.apikey_regular, 'get_repo',
664 664 repoid=self.REPO)
665 665 response = api_call(self, params)
666 666
667 667 expected = 'repository `%s` does not exist' % (self.REPO)
668 668 self._compare_error(id_, expected, given=response.body)
669 669 finally:
670 670 RepoModel().grant_user_permission(repo=self.REPO,
671 671 user=db.User.DEFAULT_USER_NAME,
672 672 perm='repository.read')
673 673
674 674 def test_api_get_repo_that_doesn_not_exist(self):
675 675 id_, params = _build_data(self.apikey, 'get_repo',
676 676 repoid='no-such-repo')
677 677 response = api_call(self, params)
678 678
679 679 ret = 'repository `%s` does not exist' % 'no-such-repo'
680 680 expected = ret
681 681 self._compare_error(id_, expected, given=response.body)
682 682
683 683 def test_api_get_repos(self):
684 684 id_, params = _build_data(self.apikey, 'get_repos')
685 685 response = api_call(self, params)
686 686
687 687 expected = jsonify([
688 688 repo.get_api_data()
689 689 for repo in db.Repository.query()
690 690 ])
691 691
692 692 self._compare_ok(id_, expected, given=response.body)
693 693
694 694 def test_api_get_repos_non_admin(self):
695 695 id_, params = _build_data(self.apikey_regular, 'get_repos')
696 696 response = api_call(self, params)
697 697
698 698 expected = jsonify([
699 699 repo.get_api_data()
700 700 for repo in AuthUser(dbuser=db.User.get_by_username(self.TEST_USER_LOGIN)).get_all_user_repos()
701 701 ])
702 702
703 703 self._compare_ok(id_, expected, given=response.body)
704 704
705 705 @base.parametrize('name,ret_type', [
706 706 ('all', 'all'),
707 707 ('dirs', 'dirs'),
708 708 ('files', 'files'),
709 709 ])
710 710 def test_api_get_repo_nodes(self, name, ret_type):
711 711 rev = 'tip'
712 712 path = '/'
713 713 id_, params = _build_data(self.apikey, 'get_repo_nodes',
714 714 repoid=self.REPO, revision=rev,
715 715 root_path=path,
716 716 ret_type=ret_type)
717 717 response = api_call(self, params)
718 718
719 719 # we don't the actual return types here since it's tested somewhere
720 720 # else
721 721 expected = response.json['result']
722 722 self._compare_ok(id_, expected, given=response.body)
723 723
724 724 def test_api_get_repo_nodes_bad_revisions(self):
725 725 rev = 'i-dont-exist'
726 726 path = '/'
727 727 id_, params = _build_data(self.apikey, 'get_repo_nodes',
728 728 repoid=self.REPO, revision=rev,
729 729 root_path=path, )
730 730 response = api_call(self, params)
731 731
732 732 expected = 'failed to get repo: `%s` nodes' % self.REPO
733 733 self._compare_error(id_, expected, given=response.body)
734 734
735 735 def test_api_get_repo_nodes_bad_path(self):
736 736 rev = 'tip'
737 737 path = '/idontexits'
738 738 id_, params = _build_data(self.apikey, 'get_repo_nodes',
739 739 repoid=self.REPO, revision=rev,
740 740 root_path=path, )
741 741 response = api_call(self, params)
742 742
743 743 expected = 'failed to get repo: `%s` nodes' % self.REPO
744 744 self._compare_error(id_, expected, given=response.body)
745 745
746 746 def test_api_get_repo_nodes_bad_ret_type(self):
747 747 rev = 'tip'
748 748 path = '/'
749 749 ret_type = 'error'
750 750 id_, params = _build_data(self.apikey, 'get_repo_nodes',
751 751 repoid=self.REPO, revision=rev,
752 752 root_path=path,
753 753 ret_type=ret_type)
754 754 response = api_call(self, params)
755 755
756 756 expected = ('ret_type must be one of %s'
757 757 % (','.join(sorted(['files', 'dirs', 'all']))))
758 758 self._compare_error(id_, expected, given=response.body)
759 759
760 760 @base.parametrize('name,ret_type,grant_perm', [
761 761 ('all', 'all', 'repository.write'),
762 762 ('dirs', 'dirs', 'repository.admin'),
763 763 ('files', 'files', 'repository.read'),
764 764 ])
765 765 def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
766 766 RepoModel().grant_user_permission(repo=self.REPO,
767 767 user=self.TEST_USER_LOGIN,
768 768 perm=grant_perm)
769 769 meta.Session().commit()
770 770
771 771 rev = 'tip'
772 772 path = '/'
773 773 id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
774 774 repoid=self.REPO, revision=rev,
775 775 root_path=path,
776 776 ret_type=ret_type)
777 777 response = api_call(self, params)
778 778
779 779 # we don't the actual return types here since it's tested somewhere
780 780 # else
781 781 expected = response.json['result']
782 782 try:
783 783 self._compare_ok(id_, expected, given=response.body)
784 784 finally:
785 785 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
786 786
787 787 @base.parametrize('changing_attr,updates', [
788 788 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
789 789 ('description', {'description': 'new description'}),
790 790 ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
791 791 ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control
792 792 ('clone_uri', {'clone_uri': None}),
793 793 ('landing_rev', {'landing_rev': 'branch:master'}),
794 794 ('private', {'private': True}),
795 795 ('enable_statistics', {'enable_statistics': True}),
796 796 ('enable_downloads', {'enable_downloads': True}),
797 797 ('repo_group', {'group': 'test_group_for_update'}),
798 798 ])
799 799 def test_api_create_repo(self, changing_attr, updates):
800 800 repo_name = repo_name_full = 'new_repo'
801 801
802 802 if changing_attr == 'repo_group':
803 803 group_name = updates['group']
804 804 fixture.create_repo_group(group_name)
805 805 repo_name_full = '/'.join([group_name, repo_name])
806 806 updates = {}
807 807
808 808 id_, params = _build_data(self.apikey, 'create_repo',
809 809 repo_type=self.REPO_TYPE, repo_name=repo_name_full, **updates)
810 810 response = api_call(self, params)
811 811
812 812 try:
813 813 expected = {
814 814 'msg': 'Created new repository `%s`' % repo_name_full,
815 815 'success': True}
816 816 if changing_attr == 'clone_uri' and updates['clone_uri']:
817 817 expected = 'failed to create repository `%s`' % repo_name
818 818 self._compare_error(id_, expected, given=response.body)
819 819 return
820 820 else:
821 821 self._compare_ok(id_, expected, given=response.body)
822 822
823 823 repo = db.Repository.get_by_repo_name(repo_name_full)
824 824 assert repo is not None
825 825
826 826 expected_data = {
827 827 'clone_uri': None,
828 828 'created_on': repo.created_on,
829 829 'description': repo_name,
830 830 'enable_downloads': False,
831 831 'enable_statistics': False,
832 832 'fork_of': None,
833 833 'landing_rev': ['rev', 'tip'],
834 834 'last_changeset': {'author': '',
835 835 'date': datetime.datetime(1970, 1, 1, 0, 0),
836 836 'message': '',
837 837 'raw_id': '0000000000000000000000000000000000000000',
838 838 'revision': -1,
839 839 'short_id': '000000000000'},
840 840 'owner': 'test_admin',
841 841 'private': False,
842 842 'repo_id': repo.repo_id,
843 843 'repo_name': repo_name_full,
844 844 'repo_type': self.REPO_TYPE,
845 845 }
846 846 expected_data.update(updates)
847 847 if changing_attr == 'landing_rev':
848 848 expected_data['landing_rev'] = expected_data['landing_rev'].split(':', 1)
849 849 assert repo.get_api_data() == expected_data
850 850 finally:
851 851 fixture.destroy_repo(repo_name_full)
852 852 if changing_attr == 'repo_group':
853 853 fixture.destroy_repo_group(group_name)
854 854
855 855 @base.parametrize('repo_name', [
856 856 '',
857 857 '.',
858 858 '..',
859 859 ':',
860 860 '/',
861 861 '<test>',
862 862 ])
863 863 def test_api_create_repo_bad_names(self, repo_name):
864 864 id_, params = _build_data(self.apikey, 'create_repo',
865 865 repo_name=repo_name,
866 866 owner=base.TEST_USER_ADMIN_LOGIN,
867 867 repo_type=self.REPO_TYPE,
868 868 )
869 869 response = api_call(self, params)
870 870 if repo_name == '/':
871 871 expected = "repo group `` not found"
872 872 self._compare_error(id_, expected, given=response.body)
873 873 else:
874 874 expected = "failed to create repository `%s`" % repo_name
875 875 self._compare_error(id_, expected, given=response.body)
876 876 fixture.destroy_repo(repo_name)
877 877
878 878 def test_api_create_repo_clone_uri_local(self):
879 879 # cloning from local repos was a misfeature - it would bypass access control
880 880 # TODO: introduce other test coverage of actual remote cloning
881 881 clone_uri = os.path.join(base.TESTS_TMP_PATH, self.REPO)
882 882 repo_name = 'api-repo'
883 883 id_, params = _build_data(self.apikey, 'create_repo',
884 884 repo_name=repo_name,
885 885 owner=base.TEST_USER_ADMIN_LOGIN,
886 886 repo_type=self.REPO_TYPE,
887 887 clone_uri=clone_uri,
888 888 )
889 889 response = api_call(self, params)
890 890 expected = "failed to create repository `%s`" % repo_name
891 891 self._compare_error(id_, expected, given=response.body)
892 892 fixture.destroy_repo(repo_name)
893 893
894 894 def test_api_create_repo_and_repo_group(self):
895 895 repo_group_name = 'my_gr'
896 896 repo_name = '%s/api-repo' % repo_group_name
897 897
898 898 # repo creation can no longer also create repo group
899 899 id_, params = _build_data(self.apikey, 'create_repo',
900 900 repo_name=repo_name,
901 901 owner=base.TEST_USER_ADMIN_LOGIN,
902 902 repo_type=self.REPO_TYPE,)
903 903 response = api_call(self, params)
904 904 expected = 'repo group `%s` not found' % repo_group_name
905 905 self._compare_error(id_, expected, given=response.body)
906 906 assert RepoModel().get_by_repo_name(repo_name) is None
907 907
908 908 # create group before creating repo
909 909 rg = fixture.create_repo_group(repo_group_name)
910 910 meta.Session().commit()
911 911
912 912 id_, params = _build_data(self.apikey, 'create_repo',
913 913 repo_name=repo_name,
914 914 owner=base.TEST_USER_ADMIN_LOGIN,
915 915 repo_type=self.REPO_TYPE,)
916 916 response = api_call(self, params)
917 917 expected = {
918 918 'msg': 'Created new repository `%s`' % repo_name,
919 919 'success': True,
920 920 }
921 921 self._compare_ok(id_, expected, given=response.body)
922 922 repo = RepoModel().get_by_repo_name(repo_name)
923 923 assert repo is not None
924 924
925 925 fixture.destroy_repo(repo_name)
926 926 fixture.destroy_repo_group(repo_group_name)
927 927
928 928 def test_api_create_repo_in_repo_group_without_permission(self):
929 929 repo_group_basename = 'api-repo-repo'
930 930 repo_group_name = '%s/%s' % (TEST_REPO_GROUP, repo_group_basename)
931 931 repo_name = '%s/api-repo' % repo_group_name
932 932
933 933 top_group = db.RepoGroup.get_by_group_name(TEST_REPO_GROUP)
934 934 assert top_group
935 935 rg = fixture.create_repo_group(repo_group_basename, parent_group_id=top_group)
936 936 meta.Session().commit()
937 937 RepoGroupModel().grant_user_permission(repo_group_name,
938 938 self.TEST_USER_LOGIN,
939 939 'group.none')
940 940 meta.Session().commit()
941 941
942 942 id_, params = _build_data(self.apikey_regular, 'create_repo',
943 943 repo_name=repo_name,
944 944 repo_type=self.REPO_TYPE,
945 945 )
946 946 response = api_call(self, params)
947 947
948 948 # API access control match Web access control:
949 949 expected = 'no permission to create repo in test_repo_group/api-repo-repo'
950 950 self._compare_error(id_, expected, given=response.body)
951 951
952 952 fixture.destroy_repo(repo_name)
953 953 fixture.destroy_repo_group(repo_group_name)
954 954
955 955 def test_api_create_repo_unknown_owner(self):
956 956 repo_name = 'api-repo'
957 957 owner = 'i-dont-exist'
958 958 id_, params = _build_data(self.apikey, 'create_repo',
959 959 repo_name=repo_name,
960 960 owner=owner,
961 961 repo_type=self.REPO_TYPE,
962 962 )
963 963 response = api_call(self, params)
964 964 expected = 'user `%s` does not exist' % owner
965 965 self._compare_error(id_, expected, given=response.body)
966 966
967 967 def test_api_create_repo_dont_specify_owner(self):
968 968 repo_name = 'api-repo'
969 969 owner = 'i-dont-exist'
970 970 id_, params = _build_data(self.apikey, 'create_repo',
971 971 repo_name=repo_name,
972 972 repo_type=self.REPO_TYPE,
973 973 )
974 974 response = api_call(self, params)
975 975
976 976 repo = RepoModel().get_by_repo_name(repo_name)
977 977 assert repo is not None
978 978 ret = {
979 979 'msg': 'Created new repository `%s`' % repo_name,
980 980 'success': True,
981 981 }
982 982 expected = ret
983 983 self._compare_ok(id_, expected, given=response.body)
984 984 fixture.destroy_repo(repo_name)
985 985
986 986 def test_api_create_repo_by_non_admin(self):
987 987 repo_name = 'api-repo'
988 988 owner = 'i-dont-exist'
989 989 id_, params = _build_data(self.apikey_regular, 'create_repo',
990 990 repo_name=repo_name,
991 991 repo_type=self.REPO_TYPE,
992 992 )
993 993 response = api_call(self, params)
994 994
995 995 repo = RepoModel().get_by_repo_name(repo_name)
996 996 assert repo is not None
997 997 ret = {
998 998 'msg': 'Created new repository `%s`' % repo_name,
999 999 'success': True,
1000 1000 }
1001 1001 expected = ret
1002 1002 self._compare_ok(id_, expected, given=response.body)
1003 1003 fixture.destroy_repo(repo_name)
1004 1004
1005 1005 def test_api_create_repo_by_non_admin_specify_owner(self):
1006 1006 repo_name = 'api-repo'
1007 1007 owner = 'i-dont-exist'
1008 1008 id_, params = _build_data(self.apikey_regular, 'create_repo',
1009 1009 repo_name=repo_name,
1010 1010 repo_type=self.REPO_TYPE,
1011 1011 owner=owner)
1012 1012 response = api_call(self, params)
1013 1013
1014 1014 expected = 'Only Kallithea admin can specify `owner` param'
1015 1015 self._compare_error(id_, expected, given=response.body)
1016 1016 fixture.destroy_repo(repo_name)
1017 1017
1018 1018 def test_api_create_repo_exists(self):
1019 1019 repo_name = self.REPO
1020 1020 id_, params = _build_data(self.apikey, 'create_repo',
1021 1021 repo_name=repo_name,
1022 1022 owner=base.TEST_USER_ADMIN_LOGIN,
1023 1023 repo_type=self.REPO_TYPE,)
1024 1024 response = api_call(self, params)
1025 1025 expected = "repo `%s` already exist" % repo_name
1026 1026 self._compare_error(id_, expected, given=response.body)
1027 1027
1028 1028 def test_api_create_repo_dot_dot(self):
1029 1029 # it is only possible to create repositories in existing repo groups - and '..' can't be used
1030 1030 group_name = '%s/..' % TEST_REPO_GROUP
1031 1031 repo_name = '%s/%s' % (group_name, 'could-be-outside')
1032 1032 id_, params = _build_data(self.apikey, 'create_repo',
1033 1033 repo_name=repo_name,
1034 1034 owner=base.TEST_USER_ADMIN_LOGIN,
1035 1035 repo_type=self.REPO_TYPE,)
1036 1036 response = api_call(self, params)
1037 1037 expected = 'repo group `%s` not found' % group_name
1038 1038 self._compare_error(id_, expected, given=response.body)
1039 1039 fixture.destroy_repo(repo_name)
1040 1040
1041 1041 @mock.patch.object(RepoModel, 'create', raise_exception)
1042 1042 def test_api_create_repo_exception_occurred(self):
1043 1043 repo_name = 'api-repo'
1044 1044 id_, params = _build_data(self.apikey, 'create_repo',
1045 1045 repo_name=repo_name,
1046 1046 owner=base.TEST_USER_ADMIN_LOGIN,
1047 1047 repo_type=self.REPO_TYPE,)
1048 1048 response = api_call(self, params)
1049 1049 expected = 'failed to create repository `%s`' % repo_name
1050 1050 self._compare_error(id_, expected, given=response.body)
1051 1051
1052 1052 @base.parametrize('changing_attr,updates', [
1053 1053 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1054 1054 ('description', {'description': 'new description'}),
1055 1055 ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
1056 1056 ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control
1057 1057 ('clone_uri', {'clone_uri': None}),
1058 1058 ('landing_rev', {'landing_rev': 'branch:master'}),
1059 1059 ('private', {'private': True}),
1060 1060 ('enable_statistics', {'enable_statistics': True}),
1061 1061 ('enable_downloads', {'enable_downloads': True}),
1062 1062 ('name', {'name': 'new_repo_name'}),
1063 1063 ('repo_group', {'group': 'test_group_for_update'}),
1064 1064 ])
1065 1065 def test_api_update_repo(self, changing_attr, updates):
1066 1066 repo_name = 'api_update_me'
1067 1067 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1068 1068 if changing_attr == 'repo_group':
1069 1069 fixture.create_repo_group(updates['group'])
1070 1070
1071 1071 id_, params = _build_data(self.apikey, 'update_repo',
1072 1072 repoid=repo_name, **updates)
1073 1073
1074 1074 if changing_attr == 'name':
1075 1075 repo_name = updates['name']
1076 1076 if changing_attr == 'repo_group':
1077 1077 repo_name = '/'.join([updates['group'], repo_name])
1078 1078 expected = {
1079 1079 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
1080 1080 'repository': repo.get_api_data()
1081 1081 }
1082 1082 expected['repository'].update(updates)
1083 1083 if changing_attr == 'clone_uri' and updates['clone_uri'] is None:
1084 1084 expected['repository']['clone_uri'] = ''
1085 1085 if changing_attr == 'landing_rev':
1086 1086 expected['repository']['landing_rev'] = expected['repository']['landing_rev'].split(':', 1)
1087 1087 if changing_attr == 'name':
1088 1088 expected['repository']['repo_name'] = expected['repository'].pop('name')
1089 1089 if changing_attr == 'repo_group':
1090 1090 expected['repository']['repo_name'] = expected['repository'].pop('group') + '/' + repo.repo_name
1091 1091
1092 1092 response = api_call(self, params)
1093 1093
1094 1094 try:
1095 1095 if changing_attr == 'clone_uri' and updates['clone_uri']:
1096 1096 expected = 'failed to update repo `%s`' % repo_name
1097 1097 self._compare_error(id_, expected, given=response.body)
1098 1098 else:
1099 1099 self._compare_ok(id_, expected, given=response.body)
1100 1100 finally:
1101 1101 fixture.destroy_repo(repo_name)
1102 1102 if changing_attr == 'repo_group':
1103 1103 fixture.destroy_repo_group(updates['group'])
1104 1104
1105 1105 @base.parametrize('changing_attr,updates', [
1106 1106 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1107 1107 ('description', {'description': 'new description'}),
1108 1108 ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
1109 1109 ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a misfeature - it would bypass access control
1110 1110 ('clone_uri', {'clone_uri': None}),
1111 1111 ('landing_rev', {'landing_rev': 'branch:master'}),
1112 1112 ('enable_statistics', {'enable_statistics': True}),
1113 1113 ('enable_downloads', {'enable_downloads': True}),
1114 1114 ('name', {'name': 'new_repo_name'}),
1115 1115 ('repo_group', {'group': 'test_group_for_update'}),
1116 1116 ])
1117 1117 def test_api_update_group_repo(self, changing_attr, updates):
1118 1118 group_name = 'lololo'
1119 1119 fixture.create_repo_group(group_name)
1120 1120 repo_name = '%s/api_update_me' % group_name
1121 1121 repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
1122 1122 if changing_attr == 'repo_group':
1123 1123 fixture.create_repo_group(updates['group'])
1124 1124
1125 1125 id_, params = _build_data(self.apikey, 'update_repo',
1126 1126 repoid=repo_name, **updates)
1127 1127 response = api_call(self, params)
1128 1128 if changing_attr == 'name':
1129 1129 repo_name = '%s/%s' % (group_name, updates['name'])
1130 1130 if changing_attr == 'repo_group':
1131 1131 repo_name = '/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
1132 1132 try:
1133 1133 if changing_attr == 'clone_uri' and updates['clone_uri']:
1134 1134 expected = 'failed to update repo `%s`' % repo_name
1135 1135 self._compare_error(id_, expected, given=response.body)
1136 1136 else:
1137 1137 expected = {
1138 1138 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
1139 1139 'repository': repo.get_api_data()
1140 1140 }
1141 1141 self._compare_ok(id_, expected, given=response.body)
1142 1142 finally:
1143 1143 fixture.destroy_repo(repo_name)
1144 1144 if changing_attr == 'repo_group':
1145 1145 fixture.destroy_repo_group(updates['group'])
1146 1146 fixture.destroy_repo_group(group_name)
1147 1147
1148 1148 def test_api_update_repo_repo_group_does_not_exist(self):
1149 1149 repo_name = 'admin_owned'
1150 1150 fixture.create_repo(repo_name)
1151 1151 updates = {'group': 'test_group_for_update'}
1152 1152 id_, params = _build_data(self.apikey, 'update_repo',
1153 1153 repoid=repo_name, **updates)
1154 1154 response = api_call(self, params)
1155 1155 try:
1156 1156 expected = 'repository group `%s` does not exist' % updates['group']
1157 1157 self._compare_error(id_, expected, given=response.body)
1158 1158 finally:
1159 1159 fixture.destroy_repo(repo_name)
1160 1160
1161 1161 def test_api_update_repo_regular_user_not_allowed(self):
1162 1162 repo_name = 'admin_owned'
1163 1163 fixture.create_repo(repo_name)
1164 1164 updates = {'description': 'something else'}
1165 1165 id_, params = _build_data(self.apikey_regular, 'update_repo',
1166 1166 repoid=repo_name, **updates)
1167 1167 response = api_call(self, params)
1168 1168 try:
1169 1169 expected = 'repository `%s` does not exist' % repo_name
1170 1170 self._compare_error(id_, expected, given=response.body)
1171 1171 finally:
1172 1172 fixture.destroy_repo(repo_name)
1173 1173
1174 1174 @mock.patch.object(RepoModel, 'update', raise_exception)
1175 1175 def test_api_update_repo_exception_occurred(self):
1176 1176 repo_name = 'api_update_me'
1177 1177 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1178 1178 id_, params = _build_data(self.apikey, 'update_repo',
1179 1179 repoid=repo_name, owner=base.TEST_USER_ADMIN_LOGIN,)
1180 1180 response = api_call(self, params)
1181 1181 try:
1182 1182 expected = 'failed to update repo `%s`' % repo_name
1183 1183 self._compare_error(id_, expected, given=response.body)
1184 1184 finally:
1185 1185 fixture.destroy_repo(repo_name)
1186 1186
1187 1187 def test_api_update_repo_regular_user_change_top_level_repo_name(self):
1188 1188 repo_name = 'admin_owned'
1189 1189 new_repo_name = 'new_repo_name'
1190 1190 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1191 1191 RepoModel().grant_user_permission(repo=repo_name,
1192 1192 user=self.TEST_USER_LOGIN,
1193 1193 perm='repository.admin')
1194 1194 UserModel().revoke_perm('default', 'hg.create.repository')
1195 1195 UserModel().grant_perm('default', 'hg.create.none')
1196 1196 updates = {'name': new_repo_name}
1197 1197 id_, params = _build_data(self.apikey_regular, 'update_repo',
1198 1198 repoid=repo_name, **updates)
1199 1199 response = api_call(self, params)
1200 1200 try:
1201 1201 expected = 'no permission to create (or move) top level repositories'
1202 1202 self._compare_error(id_, expected, given=response.body)
1203 1203 finally:
1204 1204 fixture.destroy_repo(repo_name)
1205 1205 fixture.destroy_repo(new_repo_name)
1206 1206
1207 1207 def test_api_update_repo_regular_user_change_repo_name_allowed(self):
1208 1208 repo_name = 'admin_owned'
1209 1209 new_repo_name = 'new_repo_name'
1210 1210 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1211 1211 RepoModel().grant_user_permission(repo=repo_name,
1212 1212 user=self.TEST_USER_LOGIN,
1213 1213 perm='repository.admin')
1214 1214 UserModel().revoke_perm('default', 'hg.create.none')
1215 1215 UserModel().grant_perm('default', 'hg.create.repository')
1216 1216 updates = {'name': new_repo_name}
1217 1217 id_, params = _build_data(self.apikey_regular, 'update_repo',
1218 1218 repoid=repo_name, **updates)
1219 1219 response = api_call(self, params)
1220 1220 try:
1221 1221 expected = {
1222 1222 'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
1223 1223 'repository': repo.get_api_data()
1224 1224 }
1225 1225 self._compare_ok(id_, expected, given=response.body)
1226 1226 finally:
1227 1227 fixture.destroy_repo(repo_name)
1228 1228 fixture.destroy_repo(new_repo_name)
1229 1229
1230 1230 def test_api_update_repo_regular_user_change_owner(self):
1231 1231 repo_name = 'admin_owned'
1232 1232 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1233 1233 RepoModel().grant_user_permission(repo=repo_name,
1234 1234 user=self.TEST_USER_LOGIN,
1235 1235 perm='repository.admin')
1236 1236 updates = {'owner': base.TEST_USER_ADMIN_LOGIN}
1237 1237 id_, params = _build_data(self.apikey_regular, 'update_repo',
1238 1238 repoid=repo_name, **updates)
1239 1239 response = api_call(self, params)
1240 1240 try:
1241 1241 expected = 'Only Kallithea admin can specify `owner` param'
1242 1242 self._compare_error(id_, expected, given=response.body)
1243 1243 finally:
1244 1244 fixture.destroy_repo(repo_name)
1245 1245
1246 1246 def test_api_delete_repo(self):
1247 1247 repo_name = 'api_delete_me'
1248 1248 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1249 1249
1250 1250 id_, params = _build_data(self.apikey, 'delete_repo',
1251 1251 repoid=repo_name, )
1252 1252 response = api_call(self, params)
1253 1253
1254 1254 ret = {
1255 1255 'msg': 'Deleted repository `%s`' % repo_name,
1256 1256 'success': True
1257 1257 }
1258 1258 try:
1259 1259 expected = ret
1260 1260 self._compare_ok(id_, expected, given=response.body)
1261 1261 finally:
1262 1262 fixture.destroy_repo(repo_name)
1263 1263
1264 1264 def test_api_delete_repo_by_non_admin(self):
1265 1265 repo_name = 'api_delete_me'
1266 1266 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
1267 1267 cur_user=self.TEST_USER_LOGIN)
1268 1268 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1269 1269 repoid=repo_name, )
1270 1270 response = api_call(self, params)
1271 1271
1272 1272 ret = {
1273 1273 'msg': 'Deleted repository `%s`' % repo_name,
1274 1274 'success': True
1275 1275 }
1276 1276 try:
1277 1277 expected = ret
1278 1278 self._compare_ok(id_, expected, given=response.body)
1279 1279 finally:
1280 1280 fixture.destroy_repo(repo_name)
1281 1281
1282 1282 def test_api_delete_repo_by_non_admin_no_permission(self):
1283 1283 repo_name = 'api_delete_me'
1284 1284 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1285 1285 try:
1286 1286 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1287 1287 repoid=repo_name, )
1288 1288 response = api_call(self, params)
1289 1289 expected = 'repository `%s` does not exist' % (repo_name)
1290 1290 self._compare_error(id_, expected, given=response.body)
1291 1291 finally:
1292 1292 fixture.destroy_repo(repo_name)
1293 1293
1294 1294 def test_api_delete_repo_exception_occurred(self):
1295 1295 repo_name = 'api_delete_me'
1296 1296 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1297 1297 try:
1298 1298 with mock.patch.object(RepoModel, 'delete', raise_exception):
1299 1299 id_, params = _build_data(self.apikey, 'delete_repo',
1300 1300 repoid=repo_name, )
1301 1301 response = api_call(self, params)
1302 1302
1303 1303 expected = 'failed to delete repository `%s`' % repo_name
1304 1304 self._compare_error(id_, expected, given=response.body)
1305 1305 finally:
1306 1306 fixture.destroy_repo(repo_name)
1307 1307
1308 1308 def test_api_fork_repo(self):
1309 1309 fork_name = 'api-repo-fork'
1310 1310 id_, params = _build_data(self.apikey, 'fork_repo',
1311 1311 repoid=self.REPO,
1312 1312 fork_name=fork_name,
1313 1313 owner=base.TEST_USER_ADMIN_LOGIN,
1314 1314 )
1315 1315 response = api_call(self, params)
1316 1316
1317 1317 ret = {
1318 1318 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1319 1319 fork_name),
1320 1320 'success': True,
1321 1321 }
1322 1322 expected = ret
1323 1323 self._compare_ok(id_, expected, given=response.body)
1324 1324 fixture.destroy_repo(fork_name)
1325 1325
1326 1326 @base.parametrize('fork_name', [
1327 1327 'api-repo-fork',
1328 1328 '%s/api-repo-fork' % TEST_REPO_GROUP,
1329 1329 ])
1330 1330 def test_api_fork_repo_non_admin(self, fork_name):
1331 1331 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1332 1332 self.TEST_USER_LOGIN,
1333 1333 'group.write')
1334 1334 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1335 1335 repoid=self.REPO,
1336 1336 fork_name=fork_name,
1337 1337 )
1338 1338 response = api_call(self, params)
1339 1339
1340 1340 ret = {
1341 1341 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1342 1342 fork_name),
1343 1343 'success': True,
1344 1344 }
1345 1345 expected = ret
1346 1346 self._compare_ok(id_, expected, given=response.body)
1347 1347 fixture.destroy_repo(fork_name)
1348 1348
1349 1349 def test_api_fork_repo_non_admin_specify_owner(self):
1350 1350 fork_name = 'api-repo-fork'
1351 1351 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1352 1352 repoid=self.REPO,
1353 1353 fork_name=fork_name,
1354 1354 owner=base.TEST_USER_ADMIN_LOGIN,
1355 1355 )
1356 1356 response = api_call(self, params)
1357 1357 expected = 'Only Kallithea admin can specify `owner` param'
1358 1358 self._compare_error(id_, expected, given=response.body)
1359 1359 fixture.destroy_repo(fork_name)
1360 1360
1361 1361 def test_api_fork_repo_non_admin_no_permission_to_fork(self):
1362 1362 RepoModel().grant_user_permission(repo=self.REPO,
1363 1363 user=db.User.DEFAULT_USER_NAME,
1364 1364 perm='repository.none')
1365 1365 fork_name = 'api-repo-fork'
1366 1366 try:
1367 1367 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1368 1368 repoid=self.REPO,
1369 1369 fork_name=fork_name,
1370 1370 )
1371 1371 response = api_call(self, params)
1372 1372 expected = 'repository `%s` does not exist' % (self.REPO)
1373 1373 self._compare_error(id_, expected, given=response.body)
1374 1374 finally:
1375 1375 RepoModel().grant_user_permission(repo=self.REPO,
1376 1376 user=db.User.DEFAULT_USER_NAME,
1377 1377 perm='repository.read')
1378 1378 fixture.destroy_repo(fork_name)
1379 1379
1380 1380 @base.parametrize('name,perm', [
1381 1381 ('read', 'repository.read'),
1382 1382 ('write', 'repository.write'),
1383 1383 ('admin', 'repository.admin'),
1384 1384 ])
1385 1385 def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
1386 1386 fork_name = 'api-repo-fork'
1387 1387 # regardless of base repository permission, forking is disallowed
1388 1388 # when repository creation is disabled
1389 1389 RepoModel().grant_user_permission(repo=self.REPO,
1390 1390 user=self.TEST_USER_LOGIN,
1391 1391 perm=perm)
1392 1392 UserModel().revoke_perm('default', 'hg.create.repository')
1393 1393 UserModel().grant_perm('default', 'hg.create.none')
1394 1394 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1395 1395 repoid=self.REPO,
1396 1396 fork_name=fork_name,
1397 1397 )
1398 1398 response = api_call(self, params)
1399 1399 expected = 'no permission to create top level repo'
1400 1400 self._compare_error(id_, expected, given=response.body)
1401 1401 fixture.destroy_repo(fork_name)
1402 1402
1403 1403 def test_api_fork_repo_unknown_owner(self):
1404 1404 fork_name = 'api-repo-fork'
1405 1405 owner = 'i-dont-exist'
1406 1406 id_, params = _build_data(self.apikey, 'fork_repo',
1407 1407 repoid=self.REPO,
1408 1408 fork_name=fork_name,
1409 1409 owner=owner,
1410 1410 )
1411 1411 response = api_call(self, params)
1412 1412 expected = 'user `%s` does not exist' % owner
1413 1413 self._compare_error(id_, expected, given=response.body)
1414 1414
1415 1415 def test_api_fork_repo_fork_exists(self):
1416 1416 fork_name = 'api-repo-fork'
1417 1417 fixture.create_fork(self.REPO, fork_name)
1418 1418
1419 1419 try:
1420 1420 fork_name = 'api-repo-fork'
1421 1421
1422 1422 id_, params = _build_data(self.apikey, 'fork_repo',
1423 1423 repoid=self.REPO,
1424 1424 fork_name=fork_name,
1425 1425 owner=base.TEST_USER_ADMIN_LOGIN,
1426 1426 )
1427 1427 response = api_call(self, params)
1428 1428
1429 1429 expected = "fork `%s` already exist" % fork_name
1430 1430 self._compare_error(id_, expected, given=response.body)
1431 1431 finally:
1432 1432 fixture.destroy_repo(fork_name)
1433 1433
1434 1434 def test_api_fork_repo_repo_exists(self):
1435 1435 fork_name = self.REPO
1436 1436
1437 1437 id_, params = _build_data(self.apikey, 'fork_repo',
1438 1438 repoid=self.REPO,
1439 1439 fork_name=fork_name,
1440 1440 owner=base.TEST_USER_ADMIN_LOGIN,
1441 1441 )
1442 1442 response = api_call(self, params)
1443 1443
1444 1444 expected = "repo `%s` already exist" % fork_name
1445 1445 self._compare_error(id_, expected, given=response.body)
1446 1446
1447 1447 @mock.patch.object(RepoModel, 'create_fork', raise_exception)
1448 1448 def test_api_fork_repo_exception_occurred(self):
1449 1449 fork_name = 'api-repo-fork'
1450 1450 id_, params = _build_data(self.apikey, 'fork_repo',
1451 1451 repoid=self.REPO,
1452 1452 fork_name=fork_name,
1453 1453 owner=base.TEST_USER_ADMIN_LOGIN,
1454 1454 )
1455 1455 response = api_call(self, params)
1456 1456
1457 1457 expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
1458 1458 fork_name)
1459 1459 self._compare_error(id_, expected, given=response.body)
1460 1460
1461 1461 def test_api_get_user_group(self):
1462 1462 id_, params = _build_data(self.apikey, 'get_user_group',
1463 1463 usergroupid=TEST_USER_GROUP)
1464 1464 response = api_call(self, params)
1465 1465
1466 1466 user_group = UserGroupModel().get_group(TEST_USER_GROUP)
1467 1467 members = []
1468 1468 for user in user_group.members:
1469 1469 user = user.user
1470 1470 members.append(user.get_api_data())
1471 1471
1472 1472 ret = user_group.get_api_data()
1473 1473 ret['members'] = members
1474 1474 expected = ret
1475 1475 self._compare_ok(id_, expected, given=response.body)
1476 1476
1477 1477 def test_api_get_user_groups(self):
1478 1478 gr_name = 'test_user_group2'
1479 1479 make_user_group(gr_name)
1480 1480
1481 1481 try:
1482 1482 id_, params = _build_data(self.apikey, 'get_user_groups', )
1483 1483 response = api_call(self, params)
1484 1484
1485 1485 expected = []
1486 1486 for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
1487 1487 user_group = UserGroupModel().get_group(gr_name)
1488 1488 ret = user_group.get_api_data()
1489 1489 expected.append(ret)
1490 1490 self._compare_ok(id_, expected, given=response.body)
1491 1491 finally:
1492 1492 fixture.destroy_user_group(gr_name)
1493 1493
1494 1494 def test_api_create_user_group(self):
1495 1495 group_name = 'some_new_group'
1496 1496 id_, params = _build_data(self.apikey, 'create_user_group',
1497 1497 group_name=group_name)
1498 1498 response = api_call(self, params)
1499 1499
1500 1500 ret = {
1501 1501 'msg': 'created new user group `%s`' % group_name,
1502 1502 'user_group': jsonify(UserGroupModel() \
1503 1503 .get_by_name(group_name) \
1504 1504 .get_api_data())
1505 1505 }
1506 1506 expected = ret
1507 1507 self._compare_ok(id_, expected, given=response.body)
1508 1508
1509 1509 fixture.destroy_user_group(group_name)
1510 1510
1511 1511 def test_api_get_user_group_that_exist(self):
1512 1512 id_, params = _build_data(self.apikey, 'create_user_group',
1513 1513 group_name=TEST_USER_GROUP)
1514 1514 response = api_call(self, params)
1515 1515
1516 1516 expected = "user group `%s` already exist" % TEST_USER_GROUP
1517 1517 self._compare_error(id_, expected, given=response.body)
1518 1518
1519 1519 @mock.patch.object(UserGroupModel, 'create', raise_exception)
1520 1520 def test_api_get_user_group_exception_occurred(self):
1521 1521 group_name = 'exception_happens'
1522 1522 id_, params = _build_data(self.apikey, 'create_user_group',
1523 1523 group_name=group_name)
1524 1524 response = api_call(self, params)
1525 1525
1526 1526 expected = 'failed to create group `%s`' % group_name
1527 1527 self._compare_error(id_, expected, given=response.body)
1528 1528
1529 1529 @base.parametrize('changing_attr,updates', [
1530 1530 ('group_name', {'group_name': 'new_group_name'}),
1531 1531 ('group_name', {'group_name': 'test_group_for_update'}),
1532 1532 ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
1533 1533 ('active', {'active': False}),
1534 1534 ('active', {'active': True}),
1535 1535 ])
1536 1536 def test_api_update_user_group(self, changing_attr, updates):
1537 1537 gr_name = 'test_group_for_update'
1538 1538 user_group = fixture.create_user_group(gr_name)
1539 1539 try:
1540 1540 id_, params = _build_data(self.apikey, 'update_user_group',
1541 1541 usergroupid=gr_name, **updates)
1542 1542 response = api_call(self, params)
1543 1543 expected = {
1544 1544 'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
1545 1545 user_group.users_group_name),
1546 1546 'user_group': user_group.get_api_data()
1547 1547 }
1548 1548 self._compare_ok(id_, expected, given=response.body)
1549 1549 finally:
1550 1550 if changing_attr == 'group_name':
1551 1551 # switch to updated name for proper cleanup
1552 1552 gr_name = updates['group_name']
1553 1553 fixture.destroy_user_group(gr_name)
1554 1554
1555 1555 @mock.patch.object(UserGroupModel, 'update', raise_exception)
1556 1556 def test_api_update_user_group_exception_occurred(self):
1557 1557 gr_name = 'test_group'
1558 1558 fixture.create_user_group(gr_name)
1559 1559 try:
1560 1560 id_, params = _build_data(self.apikey, 'update_user_group',
1561 1561 usergroupid=gr_name)
1562 1562 response = api_call(self, params)
1563 1563 expected = 'failed to update user group `%s`' % gr_name
1564 1564 self._compare_error(id_, expected, given=response.body)
1565 1565 finally:
1566 1566 fixture.destroy_user_group(gr_name)
1567 1567
1568 1568 def test_api_add_user_to_user_group(self):
1569 1569 gr_name = 'test_group'
1570 1570 fixture.create_user_group(gr_name)
1571 1571 try:
1572 1572 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1573 1573 usergroupid=gr_name,
1574 1574 userid=base.TEST_USER_ADMIN_LOGIN)
1575 1575 response = api_call(self, params)
1576 1576 expected = {
1577 1577 'msg': 'added member `%s` to user group `%s`' % (
1578 1578 base.TEST_USER_ADMIN_LOGIN, gr_name),
1579 1579 'success': True
1580 1580 }
1581 1581 self._compare_ok(id_, expected, given=response.body)
1582 1582 finally:
1583 1583 fixture.destroy_user_group(gr_name)
1584 1584
1585 1585 def test_api_add_user_to_user_group_that_doesnt_exist(self):
1586 1586 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1587 1587 usergroupid='false-group',
1588 1588 userid=base.TEST_USER_ADMIN_LOGIN)
1589 1589 response = api_call(self, params)
1590 1590
1591 1591 expected = 'user group `%s` does not exist' % 'false-group'
1592 1592 self._compare_error(id_, expected, given=response.body)
1593 1593
1594 1594 @mock.patch.object(UserGroupModel, 'add_user_to_group', raise_exception)
1595 1595 def test_api_add_user_to_user_group_exception_occurred(self):
1596 1596 gr_name = 'test_group'
1597 1597 fixture.create_user_group(gr_name)
1598 1598 try:
1599 1599 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1600 1600 usergroupid=gr_name,
1601 1601 userid=base.TEST_USER_ADMIN_LOGIN)
1602 1602 response = api_call(self, params)
1603 1603 expected = 'failed to add member to user group `%s`' % gr_name
1604 1604 self._compare_error(id_, expected, given=response.body)
1605 1605 finally:
1606 1606 fixture.destroy_user_group(gr_name)
1607 1607
1608 1608 def test_api_remove_user_from_user_group(self):
1609 1609 gr_name = 'test_group_3'
1610 1610 gr = fixture.create_user_group(gr_name)
1611 1611 UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN)
1612 1612 meta.Session().commit()
1613 1613 try:
1614 1614 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1615 1615 usergroupid=gr_name,
1616 1616 userid=base.TEST_USER_ADMIN_LOGIN)
1617 1617 response = api_call(self, params)
1618 1618 expected = {
1619 1619 'msg': 'removed member `%s` from user group `%s`' % (
1620 1620 base.TEST_USER_ADMIN_LOGIN, gr_name
1621 1621 ),
1622 1622 'success': True}
1623 1623 self._compare_ok(id_, expected, given=response.body)
1624 1624 finally:
1625 1625 fixture.destroy_user_group(gr_name)
1626 1626
1627 1627 @mock.patch.object(UserGroupModel, 'remove_user_from_group', raise_exception)
1628 1628 def test_api_remove_user_from_user_group_exception_occurred(self):
1629 1629 gr_name = 'test_group_3'
1630 1630 gr = fixture.create_user_group(gr_name)
1631 1631 UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN)
1632 1632 meta.Session().commit()
1633 1633 try:
1634 1634 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1635 1635 usergroupid=gr_name,
1636 1636 userid=base.TEST_USER_ADMIN_LOGIN)
1637 1637 response = api_call(self, params)
1638 1638 expected = 'failed to remove member from user group `%s`' % gr_name
1639 1639 self._compare_error(id_, expected, given=response.body)
1640 1640 finally:
1641 1641 fixture.destroy_user_group(gr_name)
1642 1642
1643 1643 def test_api_delete_user_group(self):
1644 1644 gr_name = 'test_group'
1645 1645 ugroup = fixture.create_user_group(gr_name)
1646 1646 gr_id = ugroup.users_group_id
1647 1647 try:
1648 1648 id_, params = _build_data(self.apikey, 'delete_user_group',
1649 1649 usergroupid=gr_name)
1650 1650 response = api_call(self, params)
1651 1651 expected = {
1652 1652 'user_group': None,
1653 1653 'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
1654 1654 }
1655 1655 self._compare_ok(id_, expected, given=response.body)
1656 1656 finally:
1657 1657 if UserGroupModel().get_by_name(gr_name):
1658 1658 fixture.destroy_user_group(gr_name)
1659 1659
1660 1660 def test_api_delete_user_group_that_is_assigned(self):
1661 1661 gr_name = 'test_group'
1662 1662 ugroup = fixture.create_user_group(gr_name)
1663 1663 gr_id = ugroup.users_group_id
1664 1664
1665 1665 ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
1666 1666 msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
1667 1667
1668 1668 try:
1669 1669 id_, params = _build_data(self.apikey, 'delete_user_group',
1670 1670 usergroupid=gr_name)
1671 1671 response = api_call(self, params)
1672 1672 expected = msg
1673 1673 self._compare_error(id_, expected, given=response.body)
1674 1674 finally:
1675 1675 if UserGroupModel().get_by_name(gr_name):
1676 1676 fixture.destroy_user_group(gr_name)
1677 1677
1678 1678 def test_api_delete_user_group_exception_occurred(self):
1679 1679 gr_name = 'test_group'
1680 1680 ugroup = fixture.create_user_group(gr_name)
1681 1681 gr_id = ugroup.users_group_id
1682 1682 id_, params = _build_data(self.apikey, 'delete_user_group',
1683 1683 usergroupid=gr_name)
1684 1684
1685 1685 try:
1686 1686 with mock.patch.object(UserGroupModel, 'delete', raise_exception):
1687 1687 response = api_call(self, params)
1688 1688 expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
1689 1689 self._compare_error(id_, expected, given=response.body)
1690 1690 finally:
1691 1691 fixture.destroy_user_group(gr_name)
1692 1692
1693 1693 @base.parametrize('name,perm', [
1694 1694 ('none', 'repository.none'),
1695 1695 ('read', 'repository.read'),
1696 1696 ('write', 'repository.write'),
1697 1697 ('admin', 'repository.admin'),
1698 1698 ])
1699 1699 def test_api_grant_user_permission(self, name, perm):
1700 1700 id_, params = _build_data(self.apikey,
1701 1701 'grant_user_permission',
1702 1702 repoid=self.REPO,
1703 1703 userid=base.TEST_USER_ADMIN_LOGIN,
1704 1704 perm=perm)
1705 1705 response = api_call(self, params)
1706 1706
1707 1707 ret = {
1708 1708 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
1709 1709 perm, base.TEST_USER_ADMIN_LOGIN, self.REPO
1710 1710 ),
1711 1711 'success': True
1712 1712 }
1713 1713 expected = ret
1714 1714 self._compare_ok(id_, expected, given=response.body)
1715 1715
1716 1716 def test_api_grant_user_permission_wrong_permission(self):
1717 1717 perm = 'haha.no.permission'
1718 1718 id_, params = _build_data(self.apikey,
1719 1719 'grant_user_permission',
1720 1720 repoid=self.REPO,
1721 1721 userid=base.TEST_USER_ADMIN_LOGIN,
1722 1722 perm=perm)
1723 1723 response = api_call(self, params)
1724 1724
1725 1725 expected = 'permission `%s` does not exist' % perm
1726 1726 self._compare_error(id_, expected, given=response.body)
1727 1727
1728 1728 @mock.patch.object(RepoModel, 'grant_user_permission', raise_exception)
1729 1729 def test_api_grant_user_permission_exception_when_adding(self):
1730 1730 perm = 'repository.read'
1731 1731 id_, params = _build_data(self.apikey,
1732 1732 'grant_user_permission',
1733 1733 repoid=self.REPO,
1734 1734 userid=base.TEST_USER_ADMIN_LOGIN,
1735 1735 perm=perm)
1736 1736 response = api_call(self, params)
1737 1737
1738 1738 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1739 1739 base.TEST_USER_ADMIN_LOGIN, self.REPO
1740 1740 )
1741 1741 self._compare_error(id_, expected, given=response.body)
1742 1742
1743 1743 def test_api_revoke_user_permission(self):
1744 1744 id_, params = _build_data(self.apikey,
1745 1745 'revoke_user_permission',
1746 1746 repoid=self.REPO,
1747 1747 userid=base.TEST_USER_ADMIN_LOGIN, )
1748 1748 response = api_call(self, params)
1749 1749
1750 1750 expected = {
1751 1751 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
1752 1752 base.TEST_USER_ADMIN_LOGIN, self.REPO
1753 1753 ),
1754 1754 'success': True
1755 1755 }
1756 1756 self._compare_ok(id_, expected, given=response.body)
1757 1757
1758 1758 @mock.patch.object(RepoModel, 'revoke_user_permission', raise_exception)
1759 1759 def test_api_revoke_user_permission_exception_when_adding(self):
1760 1760 id_, params = _build_data(self.apikey,
1761 1761 'revoke_user_permission',
1762 1762 repoid=self.REPO,
1763 1763 userid=base.TEST_USER_ADMIN_LOGIN, )
1764 1764 response = api_call(self, params)
1765 1765
1766 1766 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1767 1767 base.TEST_USER_ADMIN_LOGIN, self.REPO
1768 1768 )
1769 1769 self._compare_error(id_, expected, given=response.body)
1770 1770
1771 1771 @base.parametrize('name,perm', [
1772 1772 ('none', 'repository.none'),
1773 1773 ('read', 'repository.read'),
1774 1774 ('write', 'repository.write'),
1775 1775 ('admin', 'repository.admin'),
1776 1776 ])
1777 1777 def test_api_grant_user_group_permission(self, name, perm):
1778 1778 id_, params = _build_data(self.apikey,
1779 1779 'grant_user_group_permission',
1780 1780 repoid=self.REPO,
1781 1781 usergroupid=TEST_USER_GROUP,
1782 1782 perm=perm)
1783 1783 response = api_call(self, params)
1784 1784
1785 1785 ret = {
1786 1786 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
1787 1787 perm, TEST_USER_GROUP, self.REPO
1788 1788 ),
1789 1789 'success': True
1790 1790 }
1791 1791 expected = ret
1792 1792 self._compare_ok(id_, expected, given=response.body)
1793 1793
1794 1794 def test_api_grant_user_group_permission_wrong_permission(self):
1795 1795 perm = 'haha.no.permission'
1796 1796 id_, params = _build_data(self.apikey,
1797 1797 'grant_user_group_permission',
1798 1798 repoid=self.REPO,
1799 1799 usergroupid=TEST_USER_GROUP,
1800 1800 perm=perm)
1801 1801 response = api_call(self, params)
1802 1802
1803 1803 expected = 'permission `%s` does not exist' % perm
1804 1804 self._compare_error(id_, expected, given=response.body)
1805 1805
1806 1806 @mock.patch.object(RepoModel, 'grant_user_group_permission', raise_exception)
1807 1807 def test_api_grant_user_group_permission_exception_when_adding(self):
1808 1808 perm = 'repository.read'
1809 1809 id_, params = _build_data(self.apikey,
1810 1810 'grant_user_group_permission',
1811 1811 repoid=self.REPO,
1812 1812 usergroupid=TEST_USER_GROUP,
1813 1813 perm=perm)
1814 1814 response = api_call(self, params)
1815 1815
1816 1816 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1817 1817 TEST_USER_GROUP, self.REPO
1818 1818 )
1819 1819 self._compare_error(id_, expected, given=response.body)
1820 1820
1821 1821 def test_api_revoke_user_group_permission(self):
1822 1822 RepoModel().grant_user_group_permission(repo=self.REPO,
1823 1823 group_name=TEST_USER_GROUP,
1824 1824 perm='repository.read')
1825 1825 meta.Session().commit()
1826 1826 id_, params = _build_data(self.apikey,
1827 1827 'revoke_user_group_permission',
1828 1828 repoid=self.REPO,
1829 1829 usergroupid=TEST_USER_GROUP, )
1830 1830 response = api_call(self, params)
1831 1831
1832 1832 expected = {
1833 1833 'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
1834 1834 TEST_USER_GROUP, self.REPO
1835 1835 ),
1836 1836 'success': True
1837 1837 }
1838 1838 self._compare_ok(id_, expected, given=response.body)
1839 1839
1840 1840 @mock.patch.object(RepoModel, 'revoke_user_group_permission', raise_exception)
1841 1841 def test_api_revoke_user_group_permission_exception_when_adding(self):
1842 1842 id_, params = _build_data(self.apikey,
1843 1843 'revoke_user_group_permission',
1844 1844 repoid=self.REPO,
1845 1845 usergroupid=TEST_USER_GROUP, )
1846 1846 response = api_call(self, params)
1847 1847
1848 1848 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1849 1849 TEST_USER_GROUP, self.REPO
1850 1850 )
1851 1851 self._compare_error(id_, expected, given=response.body)
1852 1852
1853 1853 @base.parametrize('changing_attr,updates', [
1854 1854 #('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}), # currently broken
1855 1855 ('description', {'description': 'new description'}),
1856 1856 ('group_name', {'group_name': 'new_repo_name'}),
1857 #('parent', {'parent': 'test_group_for_update'}), # currently broken
1857 ('parent', {'parent': 'test_group_for_update'}),
1858 1858 ])
1859 1859 def test_api_update_repo_group(self, changing_attr, updates):
1860 1860 group_name = 'lololo'
1861 1861 repo_group = fixture.create_repo_group(group_name)
1862 1862
1863 1863 new_group_name = group_name
1864 1864 if changing_attr == 'group_name':
1865 1865 assert repo_group.parent_group_id is None # lazy assumption for this test
1866 1866 new_group_name = updates['group_name']
1867 1867 if changing_attr == 'parent':
1868 1868 new_group_name = '/'.join([updates['parent'], group_name.rsplit('/', 1)[-1]])
1869 1869
1870 1870 expected = {
1871 1871 'msg': 'updated repository group ID:%s %s' % (repo_group.group_id, new_group_name),
1872 1872 'repo_group': repo_group.get_api_data()
1873 1873 }
1874 1874 expected['repo_group'].update(updates)
1875 1875 if 'description' in updates:
1876 1876 expected['repo_group']['group_description'] = expected['repo_group'].pop('description')
1877 1877
1878 1878 if changing_attr == 'parent':
1879 1879 new_parent = fixture.create_repo_group(updates['parent'])
1880 1880 expected['repo_group']['parent_group'] = expected['repo_group'].pop('parent')
1881 1881 expected['repo_group']['group_name'] = new_group_name
1882 1882
1883 1883 id_, params = _build_data(self.apikey, 'update_repo_group',
1884 1884 repogroupid=group_name, **updates)
1885 1885 response = api_call(self, params)
1886 1886
1887 1887 try:
1888 1888 self._compare_ok(id_, expected, given=response.body)
1889 1889 finally:
1890 1890 if changing_attr == 'parent':
1891 1891 fixture.destroy_repo_group(new_parent.group_id)
1892 1892 fixture.destroy_repo_group(new_group_name)
1893 1893
1894 1894 @base.parametrize('name,perm,apply_to_children', [
1895 1895 ('none', 'group.none', 'none'),
1896 1896 ('read', 'group.read', 'none'),
1897 1897 ('write', 'group.write', 'none'),
1898 1898 ('admin', 'group.admin', 'none'),
1899 1899
1900 1900 ('none', 'group.none', 'all'),
1901 1901 ('read', 'group.read', 'all'),
1902 1902 ('write', 'group.write', 'all'),
1903 1903 ('admin', 'group.admin', 'all'),
1904 1904
1905 1905 ('none', 'group.none', 'repos'),
1906 1906 ('read', 'group.read', 'repos'),
1907 1907 ('write', 'group.write', 'repos'),
1908 1908 ('admin', 'group.admin', 'repos'),
1909 1909
1910 1910 ('none', 'group.none', 'groups'),
1911 1911 ('read', 'group.read', 'groups'),
1912 1912 ('write', 'group.write', 'groups'),
1913 1913 ('admin', 'group.admin', 'groups'),
1914 1914 ])
1915 1915 def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children):
1916 1916 id_, params = _build_data(self.apikey,
1917 1917 'grant_user_permission_to_repo_group',
1918 1918 repogroupid=TEST_REPO_GROUP,
1919 1919 userid=base.TEST_USER_ADMIN_LOGIN,
1920 1920 perm=perm, apply_to_children=apply_to_children)
1921 1921 response = api_call(self, params)
1922 1922
1923 1923 ret = {
1924 1924 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1925 1925 perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1926 1926 ),
1927 1927 'success': True
1928 1928 }
1929 1929 expected = ret
1930 1930 self._compare_ok(id_, expected, given=response.body)
1931 1931
1932 1932 @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
1933 1933 ('none_fails', 'group.none', 'none', False, False),
1934 1934 ('read_fails', 'group.read', 'none', False, False),
1935 1935 ('write_fails', 'group.write', 'none', False, False),
1936 1936 ('admin_fails', 'group.admin', 'none', False, False),
1937 1937
1938 1938 # with granted perms
1939 1939 ('none_ok', 'group.none', 'none', True, True),
1940 1940 ('read_ok', 'group.read', 'none', True, True),
1941 1941 ('write_ok', 'group.write', 'none', True, True),
1942 1942 ('admin_ok', 'group.admin', 'none', True, True),
1943 1943 ])
1944 1944 def test_api_grant_user_permission_to_repo_group_by_regular_user(
1945 1945 self, name, perm, apply_to_children, grant_admin, access_ok):
1946 1946 if grant_admin:
1947 1947 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1948 1948 self.TEST_USER_LOGIN,
1949 1949 'group.admin')
1950 1950 meta.Session().commit()
1951 1951
1952 1952 id_, params = _build_data(self.apikey_regular,
1953 1953 'grant_user_permission_to_repo_group',
1954 1954 repogroupid=TEST_REPO_GROUP,
1955 1955 userid=base.TEST_USER_ADMIN_LOGIN,
1956 1956 perm=perm, apply_to_children=apply_to_children)
1957 1957 response = api_call(self, params)
1958 1958 if access_ok:
1959 1959 ret = {
1960 1960 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1961 1961 perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1962 1962 ),
1963 1963 'success': True
1964 1964 }
1965 1965 expected = ret
1966 1966 self._compare_ok(id_, expected, given=response.body)
1967 1967 else:
1968 1968 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
1969 1969 self._compare_error(id_, expected, given=response.body)
1970 1970
1971 1971 def test_api_grant_user_permission_to_repo_group_wrong_permission(self):
1972 1972 perm = 'haha.no.permission'
1973 1973 id_, params = _build_data(self.apikey,
1974 1974 'grant_user_permission_to_repo_group',
1975 1975 repogroupid=TEST_REPO_GROUP,
1976 1976 userid=base.TEST_USER_ADMIN_LOGIN,
1977 1977 perm=perm)
1978 1978 response = api_call(self, params)
1979 1979
1980 1980 expected = 'permission `%s` does not exist' % perm
1981 1981 self._compare_error(id_, expected, given=response.body)
1982 1982
1983 1983 @mock.patch.object(RepoGroupModel, 'grant_user_permission', raise_exception)
1984 1984 def test_api_grant_user_permission_to_repo_group_exception_when_adding(self):
1985 1985 perm = 'group.read'
1986 1986 id_, params = _build_data(self.apikey,
1987 1987 'grant_user_permission_to_repo_group',
1988 1988 repogroupid=TEST_REPO_GROUP,
1989 1989 userid=base.TEST_USER_ADMIN_LOGIN,
1990 1990 perm=perm)
1991 1991 response = api_call(self, params)
1992 1992
1993 1993 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1994 1994 base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1995 1995 )
1996 1996 self._compare_error(id_, expected, given=response.body)
1997 1997
1998 1998 @base.parametrize('name,apply_to_children', [
1999 1999 ('none', 'none'),
2000 2000 ('all', 'all'),
2001 2001 ('repos', 'repos'),
2002 2002 ('groups', 'groups'),
2003 2003 ])
2004 2004 def test_api_revoke_user_permission_from_repo_group(self, name, apply_to_children):
2005 2005 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2006 2006 user=base.TEST_USER_ADMIN_LOGIN,
2007 2007 perm='group.read',)
2008 2008 meta.Session().commit()
2009 2009
2010 2010 id_, params = _build_data(self.apikey,
2011 2011 'revoke_user_permission_from_repo_group',
2012 2012 repogroupid=TEST_REPO_GROUP,
2013 2013 userid=base.TEST_USER_ADMIN_LOGIN,
2014 2014 apply_to_children=apply_to_children,)
2015 2015 response = api_call(self, params)
2016 2016
2017 2017 expected = {
2018 2018 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
2019 2019 apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2020 2020 ),
2021 2021 'success': True
2022 2022 }
2023 2023 self._compare_ok(id_, expected, given=response.body)
2024 2024
2025 2025 @base.parametrize('name,apply_to_children,grant_admin,access_ok', [
2026 2026 ('none', 'none', False, False),
2027 2027 ('all', 'all', False, False),
2028 2028 ('repos', 'repos', False, False),
2029 2029 ('groups', 'groups', False, False),
2030 2030
2031 2031 # after granting admin rights
2032 2032 ('none', 'none', False, False),
2033 2033 ('all', 'all', False, False),
2034 2034 ('repos', 'repos', False, False),
2035 2035 ('groups', 'groups', False, False),
2036 2036 ])
2037 2037 def test_api_revoke_user_permission_from_repo_group_by_regular_user(
2038 2038 self, name, apply_to_children, grant_admin, access_ok):
2039 2039 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2040 2040 user=base.TEST_USER_ADMIN_LOGIN,
2041 2041 perm='group.read',)
2042 2042 meta.Session().commit()
2043 2043
2044 2044 if grant_admin:
2045 2045 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2046 2046 self.TEST_USER_LOGIN,
2047 2047 'group.admin')
2048 2048 meta.Session().commit()
2049 2049
2050 2050 id_, params = _build_data(self.apikey_regular,
2051 2051 'revoke_user_permission_from_repo_group',
2052 2052 repogroupid=TEST_REPO_GROUP,
2053 2053 userid=base.TEST_USER_ADMIN_LOGIN,
2054 2054 apply_to_children=apply_to_children,)
2055 2055 response = api_call(self, params)
2056 2056 if access_ok:
2057 2057 expected = {
2058 2058 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
2059 2059 apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2060 2060 ),
2061 2061 'success': True
2062 2062 }
2063 2063 self._compare_ok(id_, expected, given=response.body)
2064 2064 else:
2065 2065 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2066 2066 self._compare_error(id_, expected, given=response.body)
2067 2067
2068 2068 @mock.patch.object(RepoGroupModel, 'revoke_user_permission', raise_exception)
2069 2069 def test_api_revoke_user_permission_from_repo_group_exception_when_adding(self):
2070 2070 id_, params = _build_data(self.apikey,
2071 2071 'revoke_user_permission_from_repo_group',
2072 2072 repogroupid=TEST_REPO_GROUP,
2073 2073 userid=base.TEST_USER_ADMIN_LOGIN, )
2074 2074 response = api_call(self, params)
2075 2075
2076 2076 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
2077 2077 base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2078 2078 )
2079 2079 self._compare_error(id_, expected, given=response.body)
2080 2080
2081 2081 @base.parametrize('name,perm,apply_to_children', [
2082 2082 ('none', 'group.none', 'none'),
2083 2083 ('read', 'group.read', 'none'),
2084 2084 ('write', 'group.write', 'none'),
2085 2085 ('admin', 'group.admin', 'none'),
2086 2086
2087 2087 ('none', 'group.none', 'all'),
2088 2088 ('read', 'group.read', 'all'),
2089 2089 ('write', 'group.write', 'all'),
2090 2090 ('admin', 'group.admin', 'all'),
2091 2091
2092 2092 ('none', 'group.none', 'repos'),
2093 2093 ('read', 'group.read', 'repos'),
2094 2094 ('write', 'group.write', 'repos'),
2095 2095 ('admin', 'group.admin', 'repos'),
2096 2096
2097 2097 ('none', 'group.none', 'groups'),
2098 2098 ('read', 'group.read', 'groups'),
2099 2099 ('write', 'group.write', 'groups'),
2100 2100 ('admin', 'group.admin', 'groups'),
2101 2101 ])
2102 2102 def test_api_grant_user_group_permission_to_repo_group(self, name, perm, apply_to_children):
2103 2103 id_, params = _build_data(self.apikey,
2104 2104 'grant_user_group_permission_to_repo_group',
2105 2105 repogroupid=TEST_REPO_GROUP,
2106 2106 usergroupid=TEST_USER_GROUP,
2107 2107 perm=perm,
2108 2108 apply_to_children=apply_to_children,)
2109 2109 response = api_call(self, params)
2110 2110
2111 2111 ret = {
2112 2112 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2113 2113 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2114 2114 ),
2115 2115 'success': True
2116 2116 }
2117 2117 expected = ret
2118 2118 self._compare_ok(id_, expected, given=response.body)
2119 2119
2120 2120 @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
2121 2121 ('none_fails', 'group.none', 'none', False, False),
2122 2122 ('read_fails', 'group.read', 'none', False, False),
2123 2123 ('write_fails', 'group.write', 'none', False, False),
2124 2124 ('admin_fails', 'group.admin', 'none', False, False),
2125 2125
2126 2126 # with granted perms
2127 2127 ('none_ok', 'group.none', 'none', True, True),
2128 2128 ('read_ok', 'group.read', 'none', True, True),
2129 2129 ('write_ok', 'group.write', 'none', True, True),
2130 2130 ('admin_ok', 'group.admin', 'none', True, True),
2131 2131 ])
2132 2132 def test_api_grant_user_group_permission_to_repo_group_by_regular_user(
2133 2133 self, name, perm, apply_to_children, grant_admin, access_ok):
2134 2134 if grant_admin:
2135 2135 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2136 2136 self.TEST_USER_LOGIN,
2137 2137 'group.admin')
2138 2138 meta.Session().commit()
2139 2139
2140 2140 id_, params = _build_data(self.apikey_regular,
2141 2141 'grant_user_group_permission_to_repo_group',
2142 2142 repogroupid=TEST_REPO_GROUP,
2143 2143 usergroupid=TEST_USER_GROUP,
2144 2144 perm=perm,
2145 2145 apply_to_children=apply_to_children,)
2146 2146 response = api_call(self, params)
2147 2147 if access_ok:
2148 2148 ret = {
2149 2149 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2150 2150 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2151 2151 ),
2152 2152 'success': True
2153 2153 }
2154 2154 expected = ret
2155 2155 self._compare_ok(id_, expected, given=response.body)
2156 2156 else:
2157 2157 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2158 2158 self._compare_error(id_, expected, given=response.body)
2159 2159
2160 2160 def test_api_grant_user_group_permission_to_repo_group_wrong_permission(self):
2161 2161 perm = 'haha.no.permission'
2162 2162 id_, params = _build_data(self.apikey,
2163 2163 'grant_user_group_permission_to_repo_group',
2164 2164 repogroupid=TEST_REPO_GROUP,
2165 2165 usergroupid=TEST_USER_GROUP,
2166 2166 perm=perm)
2167 2167 response = api_call(self, params)
2168 2168
2169 2169 expected = 'permission `%s` does not exist' % perm
2170 2170 self._compare_error(id_, expected, given=response.body)
2171 2171
2172 2172 @mock.patch.object(RepoGroupModel, 'grant_user_group_permission', raise_exception)
2173 2173 def test_api_grant_user_group_permission_exception_when_adding_to_repo_group(self):
2174 2174 perm = 'group.read'
2175 2175 id_, params = _build_data(self.apikey,
2176 2176 'grant_user_group_permission_to_repo_group',
2177 2177 repogroupid=TEST_REPO_GROUP,
2178 2178 usergroupid=TEST_USER_GROUP,
2179 2179 perm=perm)
2180 2180 response = api_call(self, params)
2181 2181
2182 2182 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2183 2183 TEST_USER_GROUP, TEST_REPO_GROUP
2184 2184 )
2185 2185 self._compare_error(id_, expected, given=response.body)
2186 2186
2187 2187 @base.parametrize('name,apply_to_children', [
2188 2188 ('none', 'none'),
2189 2189 ('all', 'all'),
2190 2190 ('repos', 'repos'),
2191 2191 ('groups', 'groups'),
2192 2192 ])
2193 2193 def test_api_revoke_user_group_permission_from_repo_group(self, name, apply_to_children):
2194 2194 RepoGroupModel().grant_user_group_permission(repo_group=TEST_REPO_GROUP,
2195 2195 group_name=TEST_USER_GROUP,
2196 2196 perm='group.read',)
2197 2197 meta.Session().commit()
2198 2198 id_, params = _build_data(self.apikey,
2199 2199 'revoke_user_group_permission_from_repo_group',
2200 2200 repogroupid=TEST_REPO_GROUP,
2201 2201 usergroupid=TEST_USER_GROUP,
2202 2202 apply_to_children=apply_to_children,)
2203 2203 response = api_call(self, params)
2204 2204
2205 2205 expected = {
2206 2206 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2207 2207 apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2208 2208 ),
2209 2209 'success': True
2210 2210 }
2211 2211 self._compare_ok(id_, expected, given=response.body)
2212 2212
2213 2213 @base.parametrize('name,apply_to_children,grant_admin,access_ok', [
2214 2214 ('none', 'none', False, False),
2215 2215 ('all', 'all', False, False),
2216 2216 ('repos', 'repos', False, False),
2217 2217 ('groups', 'groups', False, False),
2218 2218
2219 2219 # after granting admin rights
2220 2220 ('none', 'none', False, False),
2221 2221 ('all', 'all', False, False),
2222 2222 ('repos', 'repos', False, False),
2223 2223 ('groups', 'groups', False, False),
2224 2224 ])
2225 2225 def test_api_revoke_user_group_permission_from_repo_group_by_regular_user(
2226 2226 self, name, apply_to_children, grant_admin, access_ok):
2227 2227 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2228 2228 user=base.TEST_USER_ADMIN_LOGIN,
2229 2229 perm='group.read',)
2230 2230 meta.Session().commit()
2231 2231
2232 2232 if grant_admin:
2233 2233 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2234 2234 self.TEST_USER_LOGIN,
2235 2235 'group.admin')
2236 2236 meta.Session().commit()
2237 2237
2238 2238 id_, params = _build_data(self.apikey_regular,
2239 2239 'revoke_user_group_permission_from_repo_group',
2240 2240 repogroupid=TEST_REPO_GROUP,
2241 2241 usergroupid=TEST_USER_GROUP,
2242 2242 apply_to_children=apply_to_children,)
2243 2243 response = api_call(self, params)
2244 2244 if access_ok:
2245 2245 expected = {
2246 2246 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2247 2247 apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2248 2248 ),
2249 2249 'success': True
2250 2250 }
2251 2251 self._compare_ok(id_, expected, given=response.body)
2252 2252 else:
2253 2253 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2254 2254 self._compare_error(id_, expected, given=response.body)
2255 2255
2256 2256 @mock.patch.object(RepoGroupModel, 'revoke_user_group_permission', raise_exception)
2257 2257 def test_api_revoke_user_group_permission_from_repo_group_exception_when_adding(self):
2258 2258 id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group',
2259 2259 repogroupid=TEST_REPO_GROUP,
2260 2260 usergroupid=TEST_USER_GROUP,)
2261 2261 response = api_call(self, params)
2262 2262
2263 2263 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2264 2264 TEST_USER_GROUP, TEST_REPO_GROUP
2265 2265 )
2266 2266 self._compare_error(id_, expected, given=response.body)
2267 2267
2268 2268 def test_api_get_gist(self):
2269 2269 gist = fixture.create_gist()
2270 2270 gist_id = gist.gist_access_id
2271 2271 gist_created_on = gist.created_on
2272 2272 id_, params = _build_data(self.apikey, 'get_gist',
2273 2273 gistid=gist_id, )
2274 2274 response = api_call(self, params)
2275 2275
2276 2276 expected = {
2277 2277 'access_id': gist_id,
2278 2278 'created_on': gist_created_on,
2279 2279 'description': 'new-gist',
2280 2280 'expires': -1.0,
2281 2281 'gist_id': int(gist_id),
2282 2282 'type': 'public',
2283 2283 'url': 'http://localhost:80/_admin/gists/%s' % gist_id
2284 2284 }
2285 2285
2286 2286 self._compare_ok(id_, expected, given=response.body)
2287 2287
2288 2288 def test_api_get_gist_that_does_not_exist(self):
2289 2289 id_, params = _build_data(self.apikey_regular, 'get_gist',
2290 2290 gistid='12345', )
2291 2291 response = api_call(self, params)
2292 2292 expected = 'gist `%s` does not exist' % ('12345',)
2293 2293 self._compare_error(id_, expected, given=response.body)
2294 2294
2295 2295 def test_api_get_gist_private_gist_without_permission(self):
2296 2296 gist = fixture.create_gist()
2297 2297 gist_id = gist.gist_access_id
2298 2298 gist_created_on = gist.created_on
2299 2299 id_, params = _build_data(self.apikey_regular, 'get_gist',
2300 2300 gistid=gist_id, )
2301 2301 response = api_call(self, params)
2302 2302
2303 2303 expected = 'gist `%s` does not exist' % gist_id
2304 2304 self._compare_error(id_, expected, given=response.body)
2305 2305
2306 2306 def test_api_get_gists(self):
2307 2307 fixture.create_gist()
2308 2308 fixture.create_gist()
2309 2309
2310 2310 id_, params = _build_data(self.apikey, 'get_gists')
2311 2311 response = api_call(self, params)
2312 2312 expected = response.json
2313 2313 assert len(response.json['result']) == 2
2314 2314 #self._compare_ok(id_, expected, given=response.body)
2315 2315
2316 2316 def test_api_get_gists_regular_user(self):
2317 2317 # by admin
2318 2318 fixture.create_gist()
2319 2319 fixture.create_gist()
2320 2320
2321 2321 # by reg user
2322 2322 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2323 2323 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2324 2324 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2325 2325
2326 2326 id_, params = _build_data(self.apikey_regular, 'get_gists')
2327 2327 response = api_call(self, params)
2328 2328 expected = response.json
2329 2329 assert len(response.json['result']) == 3
2330 2330 #self._compare_ok(id_, expected, given=response.body)
2331 2331
2332 2332 def test_api_get_gists_only_for_regular_user(self):
2333 2333 # by admin
2334 2334 fixture.create_gist()
2335 2335 fixture.create_gist()
2336 2336
2337 2337 # by reg user
2338 2338 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2339 2339 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2340 2340 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2341 2341
2342 2342 id_, params = _build_data(self.apikey, 'get_gists',
2343 2343 userid=self.TEST_USER_LOGIN)
2344 2344 response = api_call(self, params)
2345 2345 expected = response.json
2346 2346 assert len(response.json['result']) == 3
2347 2347 #self._compare_ok(id_, expected, given=response.body)
2348 2348
2349 2349 def test_api_get_gists_regular_user_with_different_userid(self):
2350 2350 id_, params = _build_data(self.apikey_regular, 'get_gists',
2351 2351 userid=base.TEST_USER_ADMIN_LOGIN)
2352 2352 response = api_call(self, params)
2353 2353 expected = 'userid is not the same as your user'
2354 2354 self._compare_error(id_, expected, given=response.body)
2355 2355
2356 2356 def test_api_create_gist(self):
2357 2357 id_, params = _build_data(self.apikey_regular, 'create_gist',
2358 2358 lifetime=10,
2359 2359 description='foobar-gist',
2360 2360 gist_type='public',
2361 2361 files={'foobar': {'content': 'foo'}})
2362 2362 response = api_call(self, params)
2363 2363 expected = {
2364 2364 'gist': {
2365 2365 'access_id': response.json['result']['gist']['access_id'],
2366 2366 'created_on': response.json['result']['gist']['created_on'],
2367 2367 'description': 'foobar-gist',
2368 2368 'expires': response.json['result']['gist']['expires'],
2369 2369 'gist_id': response.json['result']['gist']['gist_id'],
2370 2370 'type': 'public',
2371 2371 'url': response.json['result']['gist']['url']
2372 2372 },
2373 2373 'msg': 'created new gist'
2374 2374 }
2375 2375 self._compare_ok(id_, expected, given=response.body)
2376 2376
2377 2377 @mock.patch.object(GistModel, 'create', raise_exception)
2378 2378 def test_api_create_gist_exception_occurred(self):
2379 2379 id_, params = _build_data(self.apikey_regular, 'create_gist',
2380 2380 files={})
2381 2381 response = api_call(self, params)
2382 2382 expected = 'failed to create gist'
2383 2383 self._compare_error(id_, expected, given=response.body)
2384 2384
2385 2385 def test_api_delete_gist(self):
2386 2386 gist_id = fixture.create_gist().gist_access_id
2387 2387 id_, params = _build_data(self.apikey, 'delete_gist',
2388 2388 gistid=gist_id)
2389 2389 response = api_call(self, params)
2390 2390 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2391 2391 self._compare_ok(id_, expected, given=response.body)
2392 2392
2393 2393 def test_api_delete_gist_regular_user(self):
2394 2394 gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
2395 2395 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2396 2396 gistid=gist_id)
2397 2397 response = api_call(self, params)
2398 2398 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2399 2399 self._compare_ok(id_, expected, given=response.body)
2400 2400
2401 2401 def test_api_delete_gist_regular_user_no_permission(self):
2402 2402 gist_id = fixture.create_gist().gist_access_id
2403 2403 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2404 2404 gistid=gist_id)
2405 2405 response = api_call(self, params)
2406 2406 expected = 'gist `%s` does not exist' % (gist_id,)
2407 2407 self._compare_error(id_, expected, given=response.body)
2408 2408
2409 2409 @mock.patch.object(GistModel, 'delete', raise_exception)
2410 2410 def test_api_delete_gist_exception_occurred(self):
2411 2411 gist_id = fixture.create_gist().gist_access_id
2412 2412 id_, params = _build_data(self.apikey, 'delete_gist',
2413 2413 gistid=gist_id)
2414 2414 response = api_call(self, params)
2415 2415 expected = 'failed to delete gist ID:%s' % (gist_id,)
2416 2416 self._compare_error(id_, expected, given=response.body)
2417 2417
2418 2418 def test_api_get_ip(self):
2419 2419 id_, params = _build_data(self.apikey, 'get_ip')
2420 2420 response = api_call(self, params)
2421 2421 expected = {
2422 2422 'server_ip_addr': '0.0.0.0',
2423 2423 'user_ips': []
2424 2424 }
2425 2425 self._compare_ok(id_, expected, given=response.body)
2426 2426
2427 2427 def test_api_get_server_info(self):
2428 2428 id_, params = _build_data(self.apikey, 'get_server_info')
2429 2429 response = api_call(self, params)
2430 2430 expected = db.Setting.get_server_info()
2431 2431 self._compare_ok(id_, expected, given=response.body)
2432 2432
2433 2433 def test_api_get_changesets(self):
2434 2434 id_, params = _build_data(self.apikey, 'get_changesets',
2435 2435 repoid=self.REPO, start=0, end=2)
2436 2436 response = api_call(self, params)
2437 2437 result = ext_json.loads(response.body)["result"]
2438 2438 assert len(result) == 3
2439 2439 assert 'message' in result[0]
2440 2440 assert 'added' not in result[0]
2441 2441
2442 2442 def test_api_get_changesets_with_max_revisions(self):
2443 2443 id_, params = _build_data(self.apikey, 'get_changesets',
2444 2444 repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10)
2445 2445 response = api_call(self, params)
2446 2446 result = ext_json.loads(response.body)["result"]
2447 2447 assert len(result) == 10
2448 2448 assert 'message' in result[0]
2449 2449 assert 'added' not in result[0]
2450 2450
2451 2451 def test_api_get_changesets_with_branch(self):
2452 2452 if self.REPO == 'vcs_test_hg':
2453 2453 branch = 'stable'
2454 2454 else:
2455 2455 pytest.skip("skipping due to missing branches in git test repo")
2456 2456 id_, params = _build_data(self.apikey, 'get_changesets',
2457 2457 repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00")
2458 2458 response = api_call(self, params)
2459 2459 result = ext_json.loads(response.body)["result"]
2460 2460 assert len(result) == 5
2461 2461 assert 'message' in result[0]
2462 2462 assert 'added' not in result[0]
2463 2463
2464 2464 def test_api_get_changesets_with_file_list(self):
2465 2465 id_, params = _build_data(self.apikey, 'get_changesets',
2466 2466 repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True)
2467 2467 response = api_call(self, params)
2468 2468 result = ext_json.loads(response.body)["result"]
2469 2469 assert len(result) == 3
2470 2470 assert 'message' in result[0]
2471 2471 assert 'added' in result[0]
2472 2472
2473 2473 def test_api_get_changeset(self):
2474 2474 review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
2475 2475 id_, params = _build_data(self.apikey, 'get_changeset',
2476 2476 repoid=self.REPO, raw_id=self.TEST_REVISION)
2477 2477 response = api_call(self, params)
2478 2478 result = ext_json.loads(response.body)["result"]
2479 2479 assert result["raw_id"] == self.TEST_REVISION
2480 2480 assert "reviews" not in result
2481 2481
2482 2482 def test_api_get_changeset_with_reviews(self):
2483 2483 reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
2484 2484 id_, params = _build_data(self.apikey, 'get_changeset',
2485 2485 repoid=self.REPO, raw_id=self.TEST_REVISION,
2486 2486 with_reviews=True)
2487 2487 response = api_call(self, params)
2488 2488 result = ext_json.loads(response.body)["result"]
2489 2489 assert result["raw_id"] == self.TEST_REVISION
2490 2490 assert "reviews" in result
2491 2491 assert len(result["reviews"]) == 1
2492 2492 review = result["reviews"][0]
2493 2493 expected = {
2494 2494 'status': 'approved',
2495 2495 'modified_at': reviewobjs[0].modified_at.replace(microsecond=0).isoformat(),
2496 2496 'reviewer': 'test_admin',
2497 2497 }
2498 2498 assert review == expected
2499 2499
2500 2500 def test_api_get_changeset_that_does_not_exist(self):
2501 2501 """ Fetch changeset status for non-existant changeset.
2502 2502 revision id is the above git hash used in the test above with the
2503 2503 last 3 nibbles replaced with 0xf. Should not exist for git _or_ hg.
2504 2504 """
2505 2505 id_, params = _build_data(self.apikey, 'get_changeset',
2506 2506 repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff')
2507 2507 response = api_call(self, params)
2508 2508 expected = 'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',)
2509 2509 self._compare_error(id_, expected, given=response.body)
2510 2510
2511 2511 def test_api_get_changeset_without_permission(self):
2512 2512 review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
2513 2513 RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN)
2514 2514 RepoModel().revoke_user_permission(repo=self.REPO, user="default")
2515 2515 id_, params = _build_data(self.apikey_regular, 'get_changeset',
2516 2516 repoid=self.REPO, raw_id=self.TEST_REVISION)
2517 2517 response = api_call(self, params)
2518 2518 expected = 'Access denied to repo %s' % self.REPO
2519 2519 self._compare_error(id_, expected, given=response.body)
2520 2520
2521 2521 def test_api_get_pullrequest(self):
2522 2522 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test')
2523 2523 random_id = random.randrange(1, 9999)
2524 2524 params = ascii_bytes(ext_json.dumps({
2525 2525 "id": random_id,
2526 2526 "api_key": self.apikey,
2527 2527 "method": 'get_pullrequest',
2528 2528 "args": {"pullrequest_id": pull_request_id},
2529 2529 }))
2530 2530 response = api_call(self, params)
2531 2531 pullrequest = db.PullRequest().get(pull_request_id)
2532 2532 expected = {
2533 2533 "status": "new",
2534 2534 "pull_request_id": pull_request_id,
2535 2535 "description": "No description",
2536 2536 "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
2537 2537 "reviewers": [{"username": "test_regular"}],
2538 2538 "org_repo_url": "http://localhost:80/%s" % self.REPO,
2539 2539 "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
2540 2540 "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
2541 2541 "comments": [{"username": base.TEST_USER_ADMIN_LOGIN, "text": "",
2542 2542 "comment_id": pullrequest.comments[0].comment_id}],
2543 2543 "owner": base.TEST_USER_ADMIN_LOGIN,
2544 2544 "statuses": [{"status": "under_review", "reviewer": base.TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
2545 2545 "title": "get test",
2546 2546 "revisions": self.TEST_PR_REVISIONS,
2547 2547 "created_on": "2000-01-01T00:00:00",
2548 2548 "updated_on": "2000-01-01T00:00:00",
2549 2549 }
2550 2550 self._compare_ok(random_id, expected,
2551 2551 given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
2552 2552 b"2000-01-01T00:00:00", response.body))
2553 2553
2554 2554 def test_api_close_pullrequest(self):
2555 2555 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test')
2556 2556 random_id = random.randrange(1, 9999)
2557 2557 params = ascii_bytes(ext_json.dumps({
2558 2558 "id": random_id,
2559 2559 "api_key": self.apikey,
2560 2560 "method": "comment_pullrequest",
2561 2561 "args": {"pull_request_id": pull_request_id, "close_pr": True},
2562 2562 }))
2563 2563 response = api_call(self, params)
2564 2564 self._compare_ok(random_id, True, given=response.body)
2565 2565 pullrequest = db.PullRequest().get(pull_request_id)
2566 2566 assert pullrequest.comments[-1].text == ''
2567 2567 assert pullrequest.status == db.PullRequest.STATUS_CLOSED
2568 2568 assert pullrequest.is_closed() == True
2569 2569
2570 2570 def test_api_status_pullrequest(self):
2571 2571 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test")
2572 2572
2573 2573 random_id = random.randrange(1, 9999)
2574 2574 params = ascii_bytes(ext_json.dumps({
2575 2575 "id": random_id,
2576 2576 "api_key": db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key,
2577 2577 "method": "comment_pullrequest",
2578 2578 "args": {"pull_request_id": pull_request_id, "status": db.ChangesetStatus.STATUS_APPROVED},
2579 2579 }))
2580 2580 response = api_call(self, params)
2581 2581 pullrequest = db.PullRequest().get(pull_request_id)
2582 2582 self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
2583 2583 assert db.ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
2584 2584 params = ascii_bytes(ext_json.dumps({
2585 2585 "id": random_id,
2586 2586 "api_key": db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key,
2587 2587 "method": "comment_pullrequest",
2588 2588 "args": {"pull_request_id": pull_request_id, "status": db.ChangesetStatus.STATUS_APPROVED},
2589 2589 }))
2590 2590 response = api_call(self, params)
2591 2591 self._compare_ok(random_id, True, given=response.body)
2592 2592 pullrequest = db.PullRequest().get(pull_request_id)
2593 2593 assert db.ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
2594 2594
2595 2595 def test_api_comment_pullrequest(self):
2596 2596 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test")
2597 2597 random_id = random.randrange(1, 9999)
2598 2598 params = ascii_bytes(ext_json.dumps({
2599 2599 "id": random_id,
2600 2600 "api_key": self.apikey,
2601 2601 "method": "comment_pullrequest",
2602 2602 "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
2603 2603 }))
2604 2604 response = api_call(self, params)
2605 2605 self._compare_ok(random_id, True, given=response.body)
2606 2606 pullrequest = db.PullRequest().get(pull_request_id)
2607 2607 assert pullrequest.comments[-1].text == 'Looks good to me'
2608 2608
2609 2609 def test_api_edit_reviewers_add_single(self):
2610 2610 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2611 2611 pullrequest = db.PullRequest().get(pull_request_id)
2612 2612 pullrequest.owner = self.test_user
2613 2613 random_id = random.randrange(1, 9999)
2614 2614 params = ascii_bytes(ext_json.dumps({
2615 2615 "id": random_id,
2616 2616 "api_key": self.apikey_regular,
2617 2617 "method": "edit_reviewers",
2618 2618 "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
2619 2619 }))
2620 2620 response = api_call(self, params)
2621 2621 expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN], 'already_present': [], 'removed': [] }
2622 2622
2623 2623 self._compare_ok(random_id, expected, given=response.body)
2624 2624 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2625 2625
2626 2626 def test_api_edit_reviewers_add_nonexistent(self):
2627 2627 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2628 2628 pullrequest = db.PullRequest().get(pull_request_id)
2629 2629 pullrequest.owner = self.test_user
2630 2630 random_id = random.randrange(1, 9999)
2631 2631 params = ascii_bytes(ext_json.dumps({
2632 2632 "id": random_id,
2633 2633 "api_key": self.apikey_regular,
2634 2634 "method": "edit_reviewers",
2635 2635 "args": {"pull_request_id": pull_request_id, "add": 999},
2636 2636 }))
2637 2637 response = api_call(self, params)
2638 2638
2639 2639 self._compare_error(random_id, "user `999` does not exist", given=response.body)
2640 2640
2641 2641 def test_api_edit_reviewers_add_multiple(self):
2642 2642 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2643 2643 pullrequest = db.PullRequest().get(pull_request_id)
2644 2644 pullrequest.owner = self.test_user
2645 2645 random_id = random.randrange(1, 9999)
2646 2646 params = ascii_bytes(ext_json.dumps({
2647 2647 "id": random_id,
2648 2648 "api_key": self.apikey_regular,
2649 2649 "method": "edit_reviewers",
2650 2650 "args": {
2651 2651 "pull_request_id": pull_request_id,
2652 2652 "add": [ self.TEST_USER_LOGIN, base.TEST_USER_REGULAR2_LOGIN ]
2653 2653 },
2654 2654 }))
2655 2655 response = api_call(self, params)
2656 2656 # list order depends on python sorting hash, which is randomized
2657 2657 assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN, self.TEST_USER_LOGIN])
2658 2658 assert set(ext_json.loads(response.body)['result']['already_present']) == set()
2659 2659 assert set(ext_json.loads(response.body)['result']['removed']) == set()
2660 2660
2661 2661 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2662 2662 assert db.User.get_by_username(self.TEST_USER_LOGIN) in pullrequest.get_reviewer_users()
2663 2663
2664 2664 def test_api_edit_reviewers_add_already_present(self):
2665 2665 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2666 2666 pullrequest = db.PullRequest().get(pull_request_id)
2667 2667 pullrequest.owner = self.test_user
2668 2668 random_id = random.randrange(1, 9999)
2669 2669 params = ascii_bytes(ext_json.dumps({
2670 2670 "id": random_id,
2671 2671 "api_key": self.apikey_regular,
2672 2672 "method": "edit_reviewers",
2673 2673 "args": {
2674 2674 "pull_request_id": pull_request_id,
2675 2675 "add": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ]
2676 2676 },
2677 2677 }))
2678 2678 response = api_call(self, params)
2679 2679 expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN],
2680 2680 'already_present': [base.TEST_USER_REGULAR_LOGIN],
2681 2681 'removed': [],
2682 2682 }
2683 2683
2684 2684 self._compare_ok(random_id, expected, given=response.body)
2685 2685 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2686 2686 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2687 2687
2688 2688 def test_api_edit_reviewers_add_closed(self):
2689 2689 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2690 2690 pullrequest = db.PullRequest().get(pull_request_id)
2691 2691 pullrequest.owner = self.test_user
2692 2692 PullRequestModel().close_pull_request(pull_request_id)
2693 2693 random_id = random.randrange(1, 9999)
2694 2694 params = ascii_bytes(ext_json.dumps({
2695 2695 "id": random_id,
2696 2696 "api_key": self.apikey_regular,
2697 2697 "method": "edit_reviewers",
2698 2698 "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
2699 2699 }))
2700 2700 response = api_call(self, params)
2701 2701 self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body)
2702 2702
2703 2703 def test_api_edit_reviewers_add_not_owner(self):
2704 2704 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2705 2705 pullrequest = db.PullRequest().get(pull_request_id)
2706 2706 pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
2707 2707 random_id = random.randrange(1, 9999)
2708 2708 params = ascii_bytes(ext_json.dumps({
2709 2709 "id": random_id,
2710 2710 "api_key": self.apikey_regular,
2711 2711 "method": "edit_reviewers",
2712 2712 "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
2713 2713 }))
2714 2714 response = api_call(self, params)
2715 2715 self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body)
2716 2716
2717 2717
2718 2718 def test_api_edit_reviewers_remove_single(self):
2719 2719 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2720 2720 pullrequest = db.PullRequest().get(pull_request_id)
2721 2721 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2722 2722
2723 2723 pullrequest.owner = self.test_user
2724 2724 random_id = random.randrange(1, 9999)
2725 2725 params = ascii_bytes(ext_json.dumps({
2726 2726 "id": random_id,
2727 2727 "api_key": self.apikey_regular,
2728 2728 "method": "edit_reviewers",
2729 2729 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
2730 2730 }))
2731 2731 response = api_call(self, params)
2732 2732
2733 2733 expected = { 'added': [],
2734 2734 'already_present': [],
2735 2735 'removed': [base.TEST_USER_REGULAR_LOGIN],
2736 2736 }
2737 2737 self._compare_ok(random_id, expected, given=response.body)
2738 2738 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2739 2739
2740 2740 def test_api_edit_reviewers_remove_nonexistent(self):
2741 2741 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2742 2742 pullrequest = db.PullRequest().get(pull_request_id)
2743 2743 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2744 2744
2745 2745 pullrequest.owner = self.test_user
2746 2746 random_id = random.randrange(1, 9999)
2747 2747 params = ascii_bytes(ext_json.dumps({
2748 2748 "id": random_id,
2749 2749 "api_key": self.apikey_regular,
2750 2750 "method": "edit_reviewers",
2751 2751 "args": {"pull_request_id": pull_request_id, "remove": 999},
2752 2752 }))
2753 2753 response = api_call(self, params)
2754 2754
2755 2755 self._compare_error(random_id, "user `999` does not exist", given=response.body)
2756 2756 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2757 2757
2758 2758 def test_api_edit_reviewers_remove_nonpresent(self):
2759 2759 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2760 2760 pullrequest = db.PullRequest().get(pull_request_id)
2761 2761 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2762 2762 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2763 2763
2764 2764 pullrequest.owner = self.test_user
2765 2765 random_id = random.randrange(1, 9999)
2766 2766 params = ascii_bytes(ext_json.dumps({
2767 2767 "id": random_id,
2768 2768 "api_key": self.apikey_regular,
2769 2769 "method": "edit_reviewers",
2770 2770 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR2_LOGIN},
2771 2771 }))
2772 2772 response = api_call(self, params)
2773 2773
2774 2774 # NOTE: no explicit indication that removed user was not even a reviewer
2775 2775 expected = { 'added': [],
2776 2776 'already_present': [],
2777 2777 'removed': [base.TEST_USER_REGULAR2_LOGIN],
2778 2778 }
2779 2779 self._compare_ok(random_id, expected, given=response.body)
2780 2780 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2781 2781 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2782 2782
2783 2783 def test_api_edit_reviewers_remove_multiple(self):
2784 2784 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2785 2785 pullrequest = db.PullRequest().get(pull_request_id)
2786 2786 prr = db.PullRequestReviewer(db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN), pullrequest)
2787 2787 meta.Session().add(prr)
2788 2788 meta.Session().commit()
2789 2789
2790 2790 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2791 2791 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2792 2792
2793 2793 pullrequest.owner = self.test_user
2794 2794 random_id = random.randrange(1, 9999)
2795 2795 params = ascii_bytes(ext_json.dumps({
2796 2796 "id": random_id,
2797 2797 "api_key": self.apikey_regular,
2798 2798 "method": "edit_reviewers",
2799 2799 "args": {"pull_request_id": pull_request_id, "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ] },
2800 2800 }))
2801 2801 response = api_call(self, params)
2802 2802
2803 2803 # list order depends on python sorting hash, which is randomized
2804 2804 assert set(ext_json.loads(response.body)['result']['added']) == set()
2805 2805 assert set(ext_json.loads(response.body)['result']['already_present']) == set()
2806 2806 assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN])
2807 2807 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2808 2808 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2809 2809
2810 2810 def test_api_edit_reviewers_remove_closed(self):
2811 2811 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2812 2812 pullrequest = db.PullRequest().get(pull_request_id)
2813 2813 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2814 2814 PullRequestModel().close_pull_request(pull_request_id)
2815 2815
2816 2816 pullrequest.owner = self.test_user
2817 2817 random_id = random.randrange(1, 9999)
2818 2818 params = ascii_bytes(ext_json.dumps({
2819 2819 "id": random_id,
2820 2820 "api_key": self.apikey_regular,
2821 2821 "method": "edit_reviewers",
2822 2822 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
2823 2823 }))
2824 2824 response = api_call(self, params)
2825 2825
2826 2826 self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body)
2827 2827 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2828 2828
2829 2829 def test_api_edit_reviewers_remove_not_owner(self):
2830 2830 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2831 2831 pullrequest = db.PullRequest().get(pull_request_id)
2832 2832 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2833 2833
2834 2834 pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
2835 2835 random_id = random.randrange(1, 9999)
2836 2836 params = ascii_bytes(ext_json.dumps({
2837 2837 "id": random_id,
2838 2838 "api_key": self.apikey_regular,
2839 2839 "method": "edit_reviewers",
2840 2840 "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
2841 2841 }))
2842 2842 response = api_call(self, params)
2843 2843
2844 2844 self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body)
2845 2845 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2846 2846
2847 2847 def test_api_edit_reviewers_add_remove_single(self):
2848 2848 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2849 2849 pullrequest = db.PullRequest().get(pull_request_id)
2850 2850 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2851 2851 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2852 2852
2853 2853 pullrequest.owner = self.test_user
2854 2854 random_id = random.randrange(1, 9999)
2855 2855 params = ascii_bytes(ext_json.dumps({
2856 2856 "id": random_id,
2857 2857 "api_key": self.apikey_regular,
2858 2858 "method": "edit_reviewers",
2859 2859 "args": {"pull_request_id": pull_request_id,
2860 2860 "add": base.TEST_USER_REGULAR2_LOGIN,
2861 2861 "remove": base.TEST_USER_REGULAR_LOGIN
2862 2862 },
2863 2863 }))
2864 2864 response = api_call(self, params)
2865 2865
2866 2866 expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN],
2867 2867 'already_present': [],
2868 2868 'removed': [base.TEST_USER_REGULAR_LOGIN],
2869 2869 }
2870 2870 self._compare_ok(random_id, expected, given=response.body)
2871 2871 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2872 2872 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2873 2873
2874 2874 def test_api_edit_reviewers_add_remove_multiple(self):
2875 2875 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2876 2876 pullrequest = db.PullRequest().get(pull_request_id)
2877 2877 prr = db.PullRequestReviewer(db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN), pullrequest)
2878 2878 meta.Session().add(prr)
2879 2879 meta.Session().commit()
2880 2880 assert db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) in pullrequest.get_reviewer_users()
2881 2881 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2882 2882 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
2883 2883
2884 2884 pullrequest.owner = self.test_user
2885 2885 random_id = random.randrange(1, 9999)
2886 2886 params = ascii_bytes(ext_json.dumps({
2887 2887 "id": random_id,
2888 2888 "api_key": self.apikey_regular,
2889 2889 "method": "edit_reviewers",
2890 2890 "args": {"pull_request_id": pull_request_id,
2891 2891 "add": [ base.TEST_USER_REGULAR2_LOGIN ],
2892 2892 "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN ],
2893 2893 },
2894 2894 }))
2895 2895 response = api_call(self, params)
2896 2896
2897 2897 # list order depends on python sorting hash, which is randomized
2898 2898 assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN])
2899 2899 assert set(ext_json.loads(response.body)['result']['already_present']) == set()
2900 2900 assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN])
2901 2901 assert db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN) not in pullrequest.get_reviewer_users()
2902 2902 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
2903 2903 assert db.User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
2904 2904
2905 2905 def test_api_edit_reviewers_invalid_params(self):
2906 2906 pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
2907 2907 pullrequest = db.PullRequest().get(pull_request_id)
2908 2908 assert db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
2909 2909
2910 2910 pullrequest.owner = db.User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
2911 2911 random_id = random.randrange(1, 9999)
2912 2912 params = ascii_bytes(ext_json.dumps({
2913 2913 "id": random_id,
2914 2914 "api_key": self.apikey_regular,
2915 2915 "method": "edit_reviewers",
2916 2916 "args": {"pull_request_id": pull_request_id},
2917 2917 }))
2918 2918 response = api_call(self, params)
2919 2919
2920 2920 self._compare_error(random_id, "Invalid request. Neither 'add' nor 'remove' is specified.", given=response.body)
2921 2921 assert ext_json.loads(response.body)['result'] is None
General Comments 0
You need to be logged in to leave comments. Login now