##// END OF EJS Templates
usergroup: more descriptive error message when deleting user group
Joseph Rivera -
r4769:2181d005 default
parent child Browse files
Show More
@@ -1,389 +1,390 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 """
14 """
15 kallithea.model.users_group
15 kallithea.model.users_group
16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
17
17
18 user group model for Kallithea
18 user group model for Kallithea
19
19
20 This file was forked by the Kallithea project in July 2014.
20 This file was forked by the Kallithea project in July 2014.
21 Original author and date, and relevant copyright and licensing information is below:
21 Original author and date, and relevant copyright and licensing information is below:
22 :created_on: Oct 1, 2011
22 :created_on: Oct 1, 2011
23 :author: nvinot, marcink
23 :author: nvinot, marcink
24 """
24 """
25
25
26
26
27 import logging
27 import logging
28 import traceback
28 import traceback
29
29
30 from kallithea.model import BaseModel
30 from kallithea.model import BaseModel
31 from kallithea.model.db import UserGroupMember, UserGroup,\
31 from kallithea.model.db import UserGroupMember, UserGroup,\
32 UserGroupRepoToPerm, Permission, UserGroupToPerm, User, UserUserGroupToPerm,\
32 UserGroupRepoToPerm, Permission, UserGroupToPerm, User, UserUserGroupToPerm,\
33 UserGroupUserGroupToPerm
33 UserGroupUserGroupToPerm
34 from kallithea.lib.exceptions import UserGroupsAssignedException,\
34 from kallithea.lib.exceptions import UserGroupsAssignedException,\
35 RepoGroupAssignmentError
35 RepoGroupAssignmentError
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 class UserGroupModel(BaseModel):
40 class UserGroupModel(BaseModel):
41
41
42 cls = UserGroup
42 cls = UserGroup
43
43
44 def _get_user_group(self, user_group):
44 def _get_user_group(self, user_group):
45 return self._get_instance(UserGroup, user_group,
45 return self._get_instance(UserGroup, user_group,
46 callback=UserGroup.get_by_group_name)
46 callback=UserGroup.get_by_group_name)
47
47
48 def _create_default_perms(self, user_group):
48 def _create_default_perms(self, user_group):
49 # create default permission
49 # create default permission
50 default_perm = 'usergroup.read'
50 default_perm = 'usergroup.read'
51 def_user = User.get_default_user()
51 def_user = User.get_default_user()
52 for p in def_user.user_perms:
52 for p in def_user.user_perms:
53 if p.permission.permission_name.startswith('usergroup.'):
53 if p.permission.permission_name.startswith('usergroup.'):
54 default_perm = p.permission.permission_name
54 default_perm = p.permission.permission_name
55 break
55 break
56
56
57 user_group_to_perm = UserUserGroupToPerm()
57 user_group_to_perm = UserUserGroupToPerm()
58 user_group_to_perm.permission = Permission.get_by_key(default_perm)
58 user_group_to_perm.permission = Permission.get_by_key(default_perm)
59
59
60 user_group_to_perm.user_group = user_group
60 user_group_to_perm.user_group = user_group
61 user_group_to_perm.user_id = def_user.user_id
61 user_group_to_perm.user_id = def_user.user_id
62 return user_group_to_perm
62 return user_group_to_perm
63
63
64 def _update_permissions(self, user_group, perms_new=None,
64 def _update_permissions(self, user_group, perms_new=None,
65 perms_updates=None):
65 perms_updates=None):
66 from kallithea.lib.auth import HasUserGroupPermissionAny
66 from kallithea.lib.auth import HasUserGroupPermissionAny
67 if not perms_new:
67 if not perms_new:
68 perms_new = []
68 perms_new = []
69 if not perms_updates:
69 if not perms_updates:
70 perms_updates = []
70 perms_updates = []
71
71
72 # update permissions
72 # update permissions
73 for member, perm, member_type in perms_updates:
73 for member, perm, member_type in perms_updates:
74 if member_type == 'user':
74 if member_type == 'user':
75 # this updates existing one
75 # this updates existing one
76 self.grant_user_permission(
76 self.grant_user_permission(
77 user_group=user_group, user=member, perm=perm
77 user_group=user_group, user=member, perm=perm
78 )
78 )
79 else:
79 else:
80 #check if we have permissions to alter this usergroup
80 #check if we have permissions to alter this usergroup
81 if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
81 if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
82 'usergroup.admin')(member):
82 'usergroup.admin')(member):
83 self.grant_user_group_permission(
83 self.grant_user_group_permission(
84 target_user_group=user_group, user_group=member, perm=perm
84 target_user_group=user_group, user_group=member, perm=perm
85 )
85 )
86 # set new permissions
86 # set new permissions
87 for member, perm, member_type in perms_new:
87 for member, perm, member_type in perms_new:
88 if member_type == 'user':
88 if member_type == 'user':
89 self.grant_user_permission(
89 self.grant_user_permission(
90 user_group=user_group, user=member, perm=perm
90 user_group=user_group, user=member, perm=perm
91 )
91 )
92 else:
92 else:
93 #check if we have permissions to alter this usergroup
93 #check if we have permissions to alter this usergroup
94 if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
94 if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
95 'usergroup.admin')(member):
95 'usergroup.admin')(member):
96 self.grant_user_group_permission(
96 self.grant_user_group_permission(
97 target_user_group=user_group, user_group=member, perm=perm
97 target_user_group=user_group, user_group=member, perm=perm
98 )
98 )
99
99
100 def get(self, user_group_id, cache=False):
100 def get(self, user_group_id, cache=False):
101 return UserGroup.get(user_group_id)
101 return UserGroup.get(user_group_id)
102
102
103 def get_group(self, user_group):
103 def get_group(self, user_group):
104 return self._get_user_group(user_group)
104 return self._get_user_group(user_group)
105
105
106 def get_by_name(self, name, cache=False, case_insensitive=False):
106 def get_by_name(self, name, cache=False, case_insensitive=False):
107 return UserGroup.get_by_group_name(name, cache, case_insensitive)
107 return UserGroup.get_by_group_name(name, cache, case_insensitive)
108
108
109 def create(self, name, description, owner, active=True, group_data=None):
109 def create(self, name, description, owner, active=True, group_data=None):
110 try:
110 try:
111 new_user_group = UserGroup()
111 new_user_group = UserGroup()
112 new_user_group.user = self._get_user(owner)
112 new_user_group.user = self._get_user(owner)
113 new_user_group.users_group_name = name
113 new_user_group.users_group_name = name
114 new_user_group.user_group_description = description
114 new_user_group.user_group_description = description
115 new_user_group.users_group_active = active
115 new_user_group.users_group_active = active
116 if group_data:
116 if group_data:
117 new_user_group.group_data = group_data
117 new_user_group.group_data = group_data
118 self.sa.add(new_user_group)
118 self.sa.add(new_user_group)
119 perm_obj = self._create_default_perms(new_user_group)
119 perm_obj = self._create_default_perms(new_user_group)
120 self.sa.add(perm_obj)
120 self.sa.add(perm_obj)
121
121
122 self.grant_user_permission(user_group=new_user_group,
122 self.grant_user_permission(user_group=new_user_group,
123 user=owner, perm='usergroup.admin')
123 user=owner, perm='usergroup.admin')
124
124
125 return new_user_group
125 return new_user_group
126 except Exception:
126 except Exception:
127 log.error(traceback.format_exc())
127 log.error(traceback.format_exc())
128 raise
128 raise
129
129
130 def update(self, user_group, form_data):
130 def update(self, user_group, form_data):
131
131
132 try:
132 try:
133 user_group = self._get_user_group(user_group)
133 user_group = self._get_user_group(user_group)
134
134
135 for k, v in form_data.items():
135 for k, v in form_data.items():
136 if k == 'users_group_members':
136 if k == 'users_group_members':
137 user_group.members = []
137 user_group.members = []
138 self.sa.flush()
138 self.sa.flush()
139 members_list = []
139 members_list = []
140 if v:
140 if v:
141 v = [v] if isinstance(v, basestring) else v
141 v = [v] if isinstance(v, basestring) else v
142 for u_id in set(v):
142 for u_id in set(v):
143 member = UserGroupMember(user_group.users_group_id, u_id)
143 member = UserGroupMember(user_group.users_group_id, u_id)
144 members_list.append(member)
144 members_list.append(member)
145 setattr(user_group, 'members', members_list)
145 setattr(user_group, 'members', members_list)
146 setattr(user_group, k, v)
146 setattr(user_group, k, v)
147
147
148 self.sa.add(user_group)
148 self.sa.add(user_group)
149 except Exception:
149 except Exception:
150 log.error(traceback.format_exc())
150 log.error(traceback.format_exc())
151 raise
151 raise
152
152
153 def delete(self, user_group, force=False):
153 def delete(self, user_group, force=False):
154 """
154 """
155 Deletes repository group, unless force flag is used
155 Deletes user group, unless force flag is used
156 raises exception if there are members in that group, else deletes
156 raises exception if there are members in that group, else deletes
157 group and users
157 group and users
158
158
159 :param user_group:
159 :param user_group:
160 :param force:
160 :param force:
161 """
161 """
162 user_group = self._get_user_group(user_group)
162 user_group = self._get_user_group(user_group)
163 try:
163 try:
164 # check if this group is not assigned to repo
164 # check if this group is not assigned to repo
165 assigned_groups = UserGroupRepoToPerm.query()\
165 assigned_groups = UserGroupRepoToPerm.query()\
166 .filter(UserGroupRepoToPerm.users_group == user_group).all()
166 .filter(UserGroupRepoToPerm.users_group == user_group).all()
167 assigned_groups = [x.repository.repo_name for x in assigned_groups]
167
168
168 if assigned_groups and not force:
169 if assigned_groups and not force:
169 raise UserGroupsAssignedException(
170 raise UserGroupsAssignedException(
170 'RepoGroup assigned to %s' % assigned_groups)
171 'User Group assigned to %s' % ", ".join(assigned_groups))
171 self.sa.delete(user_group)
172 self.sa.delete(user_group)
172 except Exception:
173 except Exception:
173 log.error(traceback.format_exc())
174 log.error(traceback.format_exc())
174 raise
175 raise
175
176
176 def add_user_to_group(self, user_group, user):
177 def add_user_to_group(self, user_group, user):
177 user_group = self._get_user_group(user_group)
178 user_group = self._get_user_group(user_group)
178 user = self._get_user(user)
179 user = self._get_user(user)
179
180
180 for m in user_group.members:
181 for m in user_group.members:
181 u = m.user
182 u = m.user
182 if u.user_id == user.user_id:
183 if u.user_id == user.user_id:
183 # user already in the group, skip
184 # user already in the group, skip
184 return True
185 return True
185
186
186 try:
187 try:
187 user_group_member = UserGroupMember()
188 user_group_member = UserGroupMember()
188 user_group_member.user = user
189 user_group_member.user = user
189 user_group_member.users_group = user_group
190 user_group_member.users_group = user_group
190
191
191 user_group.members.append(user_group_member)
192 user_group.members.append(user_group_member)
192 user.group_member.append(user_group_member)
193 user.group_member.append(user_group_member)
193
194
194 self.sa.add(user_group_member)
195 self.sa.add(user_group_member)
195 return user_group_member
196 return user_group_member
196 except Exception:
197 except Exception:
197 log.error(traceback.format_exc())
198 log.error(traceback.format_exc())
198 raise
199 raise
199
200
200 def remove_user_from_group(self, user_group, user):
201 def remove_user_from_group(self, user_group, user):
201 user_group = self._get_user_group(user_group)
202 user_group = self._get_user_group(user_group)
202 user = self._get_user(user)
203 user = self._get_user(user)
203
204
204 user_group_member = None
205 user_group_member = None
205 for m in user_group.members:
206 for m in user_group.members:
206 if m.user.user_id == user.user_id:
207 if m.user.user_id == user.user_id:
207 # Found this user's membership row
208 # Found this user's membership row
208 user_group_member = m
209 user_group_member = m
209 break
210 break
210
211
211 if user_group_member:
212 if user_group_member:
212 try:
213 try:
213 self.sa.delete(user_group_member)
214 self.sa.delete(user_group_member)
214 return True
215 return True
215 except Exception:
216 except Exception:
216 log.error(traceback.format_exc())
217 log.error(traceback.format_exc())
217 raise
218 raise
218 else:
219 else:
219 # User isn't in that group
220 # User isn't in that group
220 return False
221 return False
221
222
222 def has_perm(self, user_group, perm):
223 def has_perm(self, user_group, perm):
223 user_group = self._get_user_group(user_group)
224 user_group = self._get_user_group(user_group)
224 perm = self._get_perm(perm)
225 perm = self._get_perm(perm)
225
226
226 return UserGroupToPerm.query()\
227 return UserGroupToPerm.query()\
227 .filter(UserGroupToPerm.users_group == user_group)\
228 .filter(UserGroupToPerm.users_group == user_group)\
228 .filter(UserGroupToPerm.permission == perm).scalar() is not None
229 .filter(UserGroupToPerm.permission == perm).scalar() is not None
229
230
230 def grant_perm(self, user_group, perm):
231 def grant_perm(self, user_group, perm):
231 user_group = self._get_user_group(user_group)
232 user_group = self._get_user_group(user_group)
232 perm = self._get_perm(perm)
233 perm = self._get_perm(perm)
233
234
234 # if this permission is already granted skip it
235 # if this permission is already granted skip it
235 _perm = UserGroupToPerm.query()\
236 _perm = UserGroupToPerm.query()\
236 .filter(UserGroupToPerm.users_group == user_group)\
237 .filter(UserGroupToPerm.users_group == user_group)\
237 .filter(UserGroupToPerm.permission == perm)\
238 .filter(UserGroupToPerm.permission == perm)\
238 .scalar()
239 .scalar()
239 if _perm:
240 if _perm:
240 return
241 return
241
242
242 new = UserGroupToPerm()
243 new = UserGroupToPerm()
243 new.users_group = user_group
244 new.users_group = user_group
244 new.permission = perm
245 new.permission = perm
245 self.sa.add(new)
246 self.sa.add(new)
246 return new
247 return new
247
248
248 def revokehas_permrevoke_permgrant_perm_perm(self, user_group, perm):
249 def revokehas_permrevoke_permgrant_perm_perm(self, user_group, perm):
249 user_group = self._get_user_group(user_group)
250 user_group = self._get_user_group(user_group)
250 perm = self._get_perm(perm)
251 perm = self._get_perm(perm)
251
252
252 obj = UserGroupToPerm.query()\
253 obj = UserGroupToPerm.query()\
253 .filter(UserGroupToPerm.users_group == user_group)\
254 .filter(UserGroupToPerm.users_group == user_group)\
254 .filter(UserGroupToPerm.permission == perm).scalar()
255 .filter(UserGroupToPerm.permission == perm).scalar()
255 if obj:
256 if obj:
256 self.sa.delete(obj)
257 self.sa.delete(obj)
257
258
258 def grant_user_permission(self, user_group, user, perm):
259 def grant_user_permission(self, user_group, user, perm):
259 """
260 """
260 Grant permission for user on given user group, or update
261 Grant permission for user on given user group, or update
261 existing one if found
262 existing one if found
262
263
263 :param user_group: Instance of UserGroup, users_group_id,
264 :param user_group: Instance of UserGroup, users_group_id,
264 or users_group_name
265 or users_group_name
265 :param user: Instance of User, user_id or username
266 :param user: Instance of User, user_id or username
266 :param perm: Instance of Permission, or permission_name
267 :param perm: Instance of Permission, or permission_name
267 """
268 """
268
269
269 user_group = self._get_user_group(user_group)
270 user_group = self._get_user_group(user_group)
270 user = self._get_user(user)
271 user = self._get_user(user)
271 permission = self._get_perm(perm)
272 permission = self._get_perm(perm)
272
273
273 # check if we have that permission already
274 # check if we have that permission already
274 obj = self.sa.query(UserUserGroupToPerm)\
275 obj = self.sa.query(UserUserGroupToPerm)\
275 .filter(UserUserGroupToPerm.user == user)\
276 .filter(UserUserGroupToPerm.user == user)\
276 .filter(UserUserGroupToPerm.user_group == user_group)\
277 .filter(UserUserGroupToPerm.user_group == user_group)\
277 .scalar()
278 .scalar()
278 if obj is None:
279 if obj is None:
279 # create new !
280 # create new !
280 obj = UserUserGroupToPerm()
281 obj = UserUserGroupToPerm()
281 obj.user_group = user_group
282 obj.user_group = user_group
282 obj.user = user
283 obj.user = user
283 obj.permission = permission
284 obj.permission = permission
284 self.sa.add(obj)
285 self.sa.add(obj)
285 log.debug('Granted perm %s to %s on %s' % (perm, user, user_group))
286 log.debug('Granted perm %s to %s on %s' % (perm, user, user_group))
286 return obj
287 return obj
287
288
288 def revoke_user_permission(self, user_group, user):
289 def revoke_user_permission(self, user_group, user):
289 """
290 """
290 Revoke permission for user on given repository group
291 Revoke permission for user on given repository group
291
292
292 :param user_group: Instance of RepoGroup, repositories_group_id,
293 :param user_group: Instance of RepoGroup, repositories_group_id,
293 or repositories_group name
294 or repositories_group name
294 :param user: Instance of User, user_id or username
295 :param user: Instance of User, user_id or username
295 """
296 """
296
297
297 user_group = self._get_user_group(user_group)
298 user_group = self._get_user_group(user_group)
298 user = self._get_user(user)
299 user = self._get_user(user)
299
300
300 obj = self.sa.query(UserUserGroupToPerm)\
301 obj = self.sa.query(UserUserGroupToPerm)\
301 .filter(UserUserGroupToPerm.user == user)\
302 .filter(UserUserGroupToPerm.user == user)\
302 .filter(UserUserGroupToPerm.user_group == user_group)\
303 .filter(UserUserGroupToPerm.user_group == user_group)\
303 .scalar()
304 .scalar()
304 if obj:
305 if obj:
305 self.sa.delete(obj)
306 self.sa.delete(obj)
306 log.debug('Revoked perm on %s on %s' % (user_group, user))
307 log.debug('Revoked perm on %s on %s' % (user_group, user))
307
308
308 def grant_user_group_permission(self, target_user_group, user_group, perm):
309 def grant_user_group_permission(self, target_user_group, user_group, perm):
309 """
310 """
310 Grant user group permission for given target_user_group
311 Grant user group permission for given target_user_group
311
312
312 :param target_user_group:
313 :param target_user_group:
313 :param user_group:
314 :param user_group:
314 :param perm:
315 :param perm:
315 """
316 """
316 target_user_group = self._get_user_group(target_user_group)
317 target_user_group = self._get_user_group(target_user_group)
317 user_group = self._get_user_group(user_group)
318 user_group = self._get_user_group(user_group)
318 permission = self._get_perm(perm)
319 permission = self._get_perm(perm)
319 # forbid assigning same user group to itself
320 # forbid assigning same user group to itself
320 if target_user_group == user_group:
321 if target_user_group == user_group:
321 raise RepoGroupAssignmentError('target repo:%s cannot be '
322 raise RepoGroupAssignmentError('target repo:%s cannot be '
322 'assigned to itself' % target_user_group)
323 'assigned to itself' % target_user_group)
323
324
324 # check if we have that permission already
325 # check if we have that permission already
325 obj = self.sa.query(UserGroupUserGroupToPerm)\
326 obj = self.sa.query(UserGroupUserGroupToPerm)\
326 .filter(UserGroupUserGroupToPerm.target_user_group == target_user_group)\
327 .filter(UserGroupUserGroupToPerm.target_user_group == target_user_group)\
327 .filter(UserGroupUserGroupToPerm.user_group == user_group)\
328 .filter(UserGroupUserGroupToPerm.user_group == user_group)\
328 .scalar()
329 .scalar()
329 if obj is None:
330 if obj is None:
330 # create new !
331 # create new !
331 obj = UserGroupUserGroupToPerm()
332 obj = UserGroupUserGroupToPerm()
332 obj.user_group = user_group
333 obj.user_group = user_group
333 obj.target_user_group = target_user_group
334 obj.target_user_group = target_user_group
334 obj.permission = permission
335 obj.permission = permission
335 self.sa.add(obj)
336 self.sa.add(obj)
336 log.debug('Granted perm %s to %s on %s' % (perm, target_user_group, user_group))
337 log.debug('Granted perm %s to %s on %s' % (perm, target_user_group, user_group))
337 return obj
338 return obj
338
339
339 def revoke_user_group_permission(self, target_user_group, user_group):
340 def revoke_user_group_permission(self, target_user_group, user_group):
340 """
341 """
341 Revoke user group permission for given target_user_group
342 Revoke user group permission for given target_user_group
342
343
343 :param target_user_group:
344 :param target_user_group:
344 :param user_group:
345 :param user_group:
345 """
346 """
346 target_user_group = self._get_user_group(target_user_group)
347 target_user_group = self._get_user_group(target_user_group)
347 user_group = self._get_user_group(user_group)
348 user_group = self._get_user_group(user_group)
348
349
349 obj = self.sa.query(UserGroupUserGroupToPerm)\
350 obj = self.sa.query(UserGroupUserGroupToPerm)\
350 .filter(UserGroupUserGroupToPerm.target_user_group == target_user_group)\
351 .filter(UserGroupUserGroupToPerm.target_user_group == target_user_group)\
351 .filter(UserGroupUserGroupToPerm.user_group == user_group)\
352 .filter(UserGroupUserGroupToPerm.user_group == user_group)\
352 .scalar()
353 .scalar()
353 if obj:
354 if obj:
354 self.sa.delete(obj)
355 self.sa.delete(obj)
355 log.debug('Revoked perm on %s on %s' % (target_user_group, user_group))
356 log.debug('Revoked perm on %s on %s' % (target_user_group, user_group))
356
357
357 def enforce_groups(self, user, groups, extern_type=None):
358 def enforce_groups(self, user, groups, extern_type=None):
358 user = self._get_user(user)
359 user = self._get_user(user)
359 log.debug('Enforcing groups %s on user %s' % (user, groups))
360 log.debug('Enforcing groups %s on user %s' % (user, groups))
360 current_groups = user.group_member
361 current_groups = user.group_member
361 # find the external created groups
362 # find the external created groups
362 externals = [x.users_group for x in current_groups
363 externals = [x.users_group for x in current_groups
363 if 'extern_type' in x.users_group.group_data]
364 if 'extern_type' in x.users_group.group_data]
364
365
365 # calculate from what groups user should be removed
366 # calculate from what groups user should be removed
366 # externals that are not in groups
367 # externals that are not in groups
367 for gr in externals:
368 for gr in externals:
368 if gr.users_group_name not in groups:
369 if gr.users_group_name not in groups:
369 log.debug('Removing user %s from user group %s' % (user, gr))
370 log.debug('Removing user %s from user group %s' % (user, gr))
370 self.remove_user_from_group(gr, user)
371 self.remove_user_from_group(gr, user)
371
372
372 # now we calculate in which groups user should be == groups params
373 # now we calculate in which groups user should be == groups params
373 owner = User.get_first_admin().username
374 owner = User.get_first_admin().username
374 for gr in set(groups):
375 for gr in set(groups):
375 existing_group = UserGroup.get_by_group_name(gr)
376 existing_group = UserGroup.get_by_group_name(gr)
376 if not existing_group:
377 if not existing_group:
377 desc = 'Automatically created from plugin:%s' % extern_type
378 desc = 'Automatically created from plugin:%s' % extern_type
378 # we use first admin account to set the owner of the group
379 # we use first admin account to set the owner of the group
379 existing_group = UserGroupModel().create(gr, desc, owner,
380 existing_group = UserGroupModel().create(gr, desc, owner,
380 group_data={'extern_type': extern_type})
381 group_data={'extern_type': extern_type})
381
382
382 # we can only add users to special groups created via plugins
383 # we can only add users to special groups created via plugins
383 managed = 'extern_type' in existing_group.group_data
384 managed = 'extern_type' in existing_group.group_data
384 if managed:
385 if managed:
385 log.debug('Adding user %s to user group %s' % (user, gr))
386 log.debug('Adding user %s to user group %s' % (user, gr))
386 UserGroupModel().add_user_to_group(existing_group, user)
387 UserGroupModel().add_user_to_group(existing_group, user)
387 else:
388 else:
388 log.debug('Skipping addition to group %s since it is '
389 log.debug('Skipping addition to group %s since it is '
389 'not managed by auth plugins' % gr)
390 'not managed by auth plugins' % gr)
@@ -1,2295 +1,2295 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14
14
15 """
15 """
16 tests for api. run with::
16 tests for api. run with::
17
17
18 KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
18 KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
19 """
19 """
20
20
21 from __future__ import with_statement
21 from __future__ import with_statement
22 import os
22 import os
23 import random
23 import random
24 import mock
24 import mock
25
25
26 from kallithea.tests import *
26 from kallithea.tests import *
27 from kallithea.tests.fixture import Fixture
27 from kallithea.tests.fixture import Fixture
28 from kallithea.lib.compat import json
28 from kallithea.lib.compat import json
29 from kallithea.lib.auth import AuthUser
29 from kallithea.lib.auth import AuthUser
30 from kallithea.model.user import UserModel
30 from kallithea.model.user import UserModel
31 from kallithea.model.user_group import UserGroupModel
31 from kallithea.model.user_group import UserGroupModel
32 from kallithea.model.repo import RepoModel
32 from kallithea.model.repo import RepoModel
33 from kallithea.model.repo_group import RepoGroupModel
33 from kallithea.model.repo_group import RepoGroupModel
34 from kallithea.model.meta import Session
34 from kallithea.model.meta import Session
35 from kallithea.model.scm import ScmModel
35 from kallithea.model.scm import ScmModel
36 from kallithea.model.gist import GistModel
36 from kallithea.model.gist import GistModel
37 from kallithea.model.db import Repository, User, Setting
37 from kallithea.model.db import Repository, User, Setting
38 from kallithea.lib.utils2 import time_to_datetime
38 from kallithea.lib.utils2 import time_to_datetime
39
39
40
40
41 API_URL = '/_admin/api'
41 API_URL = '/_admin/api'
42 TEST_USER_GROUP = 'test_user_group'
42 TEST_USER_GROUP = 'test_user_group'
43 TEST_REPO_GROUP = 'test_repo_group'
43 TEST_REPO_GROUP = 'test_repo_group'
44
44
45 fixture = Fixture()
45 fixture = Fixture()
46
46
47
47
48 def _build_data(apikey, method, **kw):
48 def _build_data(apikey, method, **kw):
49 """
49 """
50 Builds API data with given random ID
50 Builds API data with given random ID
51
51
52 :param random_id:
52 :param random_id:
53 """
53 """
54 random_id = random.randrange(1, 9999)
54 random_id = random.randrange(1, 9999)
55 return random_id, json.dumps({
55 return random_id, json.dumps({
56 "id": random_id,
56 "id": random_id,
57 "api_key": apikey,
57 "api_key": apikey,
58 "method": method,
58 "method": method,
59 "args": kw
59 "args": kw
60 })
60 })
61
61
62
62
63 jsonify = lambda obj: json.loads(json.dumps(obj))
63 jsonify = lambda obj: json.loads(json.dumps(obj))
64
64
65
65
66 def crash(*args, **kwargs):
66 def crash(*args, **kwargs):
67 raise Exception('Total Crash !')
67 raise Exception('Total Crash !')
68
68
69
69
70 def api_call(test_obj, params):
70 def api_call(test_obj, params):
71 response = test_obj.app.post(API_URL, content_type='application/json',
71 response = test_obj.app.post(API_URL, content_type='application/json',
72 params=params)
72 params=params)
73 return response
73 return response
74
74
75
75
76 ## helpers
76 ## helpers
77 def make_user_group(name=TEST_USER_GROUP):
77 def make_user_group(name=TEST_USER_GROUP):
78 gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
78 gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
79 UserGroupModel().add_user_to_group(user_group=gr,
79 UserGroupModel().add_user_to_group(user_group=gr,
80 user=TEST_USER_ADMIN_LOGIN)
80 user=TEST_USER_ADMIN_LOGIN)
81 Session().commit()
81 Session().commit()
82 return gr
82 return gr
83
83
84
84
85 def make_repo_group(name=TEST_REPO_GROUP):
85 def make_repo_group(name=TEST_REPO_GROUP):
86 gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
86 gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
87 Session().commit()
87 Session().commit()
88 return gr
88 return gr
89
89
90
90
91 class BaseTestApi(object):
91 class BaseTestApi(object):
92 REPO = None
92 REPO = None
93 REPO_TYPE = None
93 REPO_TYPE = None
94
94
95 @classmethod
95 @classmethod
96 def setup_class(cls):
96 def setup_class(cls):
97 cls.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
97 cls.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
98 cls.apikey = cls.usr.api_key
98 cls.apikey = cls.usr.api_key
99 cls.test_user = UserModel().create_or_update(
99 cls.test_user = UserModel().create_or_update(
100 username='test-api',
100 username='test-api',
101 password='test',
101 password='test',
102 email='test@example.com',
102 email='test@example.com',
103 firstname='first',
103 firstname='first',
104 lastname='last'
104 lastname='last'
105 )
105 )
106 Session().commit()
106 Session().commit()
107 cls.TEST_USER_LOGIN = cls.test_user.username
107 cls.TEST_USER_LOGIN = cls.test_user.username
108 cls.apikey_regular = cls.test_user.api_key
108 cls.apikey_regular = cls.test_user.api_key
109
109
110 @classmethod
110 @classmethod
111 def teardown_class(cls):
111 def teardown_class(cls):
112 pass
112 pass
113
113
114 def setUp(self):
114 def setUp(self):
115 self.maxDiff = None
115 self.maxDiff = None
116 make_user_group()
116 make_user_group()
117 make_repo_group()
117 make_repo_group()
118
118
119 def tearDown(self):
119 def tearDown(self):
120 fixture.destroy_user_group(TEST_USER_GROUP)
120 fixture.destroy_user_group(TEST_USER_GROUP)
121 fixture.destroy_gists()
121 fixture.destroy_gists()
122 fixture.destroy_repo_group(TEST_REPO_GROUP)
122 fixture.destroy_repo_group(TEST_REPO_GROUP)
123
123
124 def _compare_ok(self, id_, expected, given):
124 def _compare_ok(self, id_, expected, given):
125 expected = jsonify({
125 expected = jsonify({
126 'id': id_,
126 'id': id_,
127 'error': None,
127 'error': None,
128 'result': expected
128 'result': expected
129 })
129 })
130 given = json.loads(given)
130 given = json.loads(given)
131 self.assertEqual(expected, given)
131 self.assertEqual(expected, given)
132
132
133 def _compare_error(self, id_, expected, given):
133 def _compare_error(self, id_, expected, given):
134 expected = jsonify({
134 expected = jsonify({
135 'id': id_,
135 'id': id_,
136 'error': expected,
136 'error': expected,
137 'result': None
137 'result': None
138 })
138 })
139 given = json.loads(given)
139 given = json.loads(given)
140 self.assertEqual(expected, given)
140 self.assertEqual(expected, given)
141
141
142 def test_Optional_object(self):
142 def test_Optional_object(self):
143 from kallithea.controllers.api.api import Optional
143 from kallithea.controllers.api.api import Optional
144
144
145 option1 = Optional(None)
145 option1 = Optional(None)
146 self.assertEqual('<Optional:%s>' % None, repr(option1))
146 self.assertEqual('<Optional:%s>' % None, repr(option1))
147 self.assertEqual(option1(), None)
147 self.assertEqual(option1(), None)
148
148
149 self.assertEqual(1, Optional.extract(Optional(1)))
149 self.assertEqual(1, Optional.extract(Optional(1)))
150 self.assertEqual('trololo', Optional.extract('trololo'))
150 self.assertEqual('trololo', Optional.extract('trololo'))
151
151
152 def test_Optional_OAttr(self):
152 def test_Optional_OAttr(self):
153 from kallithea.controllers.api.api import Optional, OAttr
153 from kallithea.controllers.api.api import Optional, OAttr
154
154
155 option1 = Optional(OAttr('apiuser'))
155 option1 = Optional(OAttr('apiuser'))
156 self.assertEqual('apiuser', Optional.extract(option1))
156 self.assertEqual('apiuser', Optional.extract(option1))
157
157
158 def test_OAttr_object(self):
158 def test_OAttr_object(self):
159 from kallithea.controllers.api.api import OAttr
159 from kallithea.controllers.api.api import OAttr
160
160
161 oattr1 = OAttr('apiuser')
161 oattr1 = OAttr('apiuser')
162 self.assertEqual('<OptionalAttr:apiuser>', repr(oattr1))
162 self.assertEqual('<OptionalAttr:apiuser>', repr(oattr1))
163 self.assertEqual(oattr1(), oattr1)
163 self.assertEqual(oattr1(), oattr1)
164
164
165 def test_api_wrong_key(self):
165 def test_api_wrong_key(self):
166 id_, params = _build_data('trololo', 'get_user')
166 id_, params = _build_data('trololo', 'get_user')
167 response = api_call(self, params)
167 response = api_call(self, params)
168
168
169 expected = 'Invalid API KEY'
169 expected = 'Invalid API KEY'
170 self._compare_error(id_, expected, given=response.body)
170 self._compare_error(id_, expected, given=response.body)
171
171
172 def test_api_missing_non_optional_param(self):
172 def test_api_missing_non_optional_param(self):
173 id_, params = _build_data(self.apikey, 'get_repo')
173 id_, params = _build_data(self.apikey, 'get_repo')
174 response = api_call(self, params)
174 response = api_call(self, params)
175
175
176 expected = 'Missing non optional `repoid` arg in JSON DATA'
176 expected = 'Missing non optional `repoid` arg in JSON DATA'
177 self._compare_error(id_, expected, given=response.body)
177 self._compare_error(id_, expected, given=response.body)
178
178
179 def test_api_missing_non_optional_param_args_null(self):
179 def test_api_missing_non_optional_param_args_null(self):
180 id_, params = _build_data(self.apikey, 'get_repo')
180 id_, params = _build_data(self.apikey, 'get_repo')
181 params = params.replace('"args": {}', '"args": null')
181 params = params.replace('"args": {}', '"args": null')
182 response = api_call(self, params)
182 response = api_call(self, params)
183
183
184 expected = 'Missing non optional `repoid` arg in JSON DATA'
184 expected = 'Missing non optional `repoid` arg in JSON DATA'
185 self._compare_error(id_, expected, given=response.body)
185 self._compare_error(id_, expected, given=response.body)
186
186
187 def test_api_missing_non_optional_param_args_bad(self):
187 def test_api_missing_non_optional_param_args_bad(self):
188 id_, params = _build_data(self.apikey, 'get_repo')
188 id_, params = _build_data(self.apikey, 'get_repo')
189 params = params.replace('"args": {}', '"args": 1')
189 params = params.replace('"args": {}', '"args": 1')
190 response = api_call(self, params)
190 response = api_call(self, params)
191
191
192 expected = 'Missing non optional `repoid` arg in JSON DATA'
192 expected = 'Missing non optional `repoid` arg in JSON DATA'
193 self._compare_error(id_, expected, given=response.body)
193 self._compare_error(id_, expected, given=response.body)
194
194
195 def test_api_args_is_null(self):
195 def test_api_args_is_null(self):
196 id_, params = _build_data(self.apikey, 'get_users', )
196 id_, params = _build_data(self.apikey, 'get_users', )
197 params = params.replace('"args": {}', '"args": null')
197 params = params.replace('"args": {}', '"args": null')
198 response = api_call(self, params)
198 response = api_call(self, params)
199 self.assertEqual(response.status, '200 OK')
199 self.assertEqual(response.status, '200 OK')
200
200
201 def test_api_args_is_bad(self):
201 def test_api_args_is_bad(self):
202 id_, params = _build_data(self.apikey, 'get_users', )
202 id_, params = _build_data(self.apikey, 'get_users', )
203 params = params.replace('"args": {}', '"args": 1')
203 params = params.replace('"args": {}', '"args": 1')
204 response = api_call(self, params)
204 response = api_call(self, params)
205 self.assertEqual(response.status, '200 OK')
205 self.assertEqual(response.status, '200 OK')
206
206
207 def test_api_args_different_args(self):
207 def test_api_args_different_args(self):
208 import string
208 import string
209 expected = {
209 expected = {
210 'ascii_letters': string.ascii_letters,
210 'ascii_letters': string.ascii_letters,
211 'ws': string.whitespace,
211 'ws': string.whitespace,
212 'printables': string.printable
212 'printables': string.printable
213 }
213 }
214 id_, params = _build_data(self.apikey, 'test', args=expected)
214 id_, params = _build_data(self.apikey, 'test', args=expected)
215 response = api_call(self, params)
215 response = api_call(self, params)
216 self.assertEqual(response.status, '200 OK')
216 self.assertEqual(response.status, '200 OK')
217 self._compare_ok(id_, expected, response.body)
217 self._compare_ok(id_, expected, response.body)
218
218
219 def test_api_get_users(self):
219 def test_api_get_users(self):
220 id_, params = _build_data(self.apikey, 'get_users', )
220 id_, params = _build_data(self.apikey, 'get_users', )
221 response = api_call(self, params)
221 response = api_call(self, params)
222 ret_all = []
222 ret_all = []
223 _users = User.query().filter(User.username != User.DEFAULT_USER) \
223 _users = User.query().filter(User.username != User.DEFAULT_USER) \
224 .order_by(User.username).all()
224 .order_by(User.username).all()
225 for usr in _users:
225 for usr in _users:
226 ret = usr.get_api_data()
226 ret = usr.get_api_data()
227 ret_all.append(jsonify(ret))
227 ret_all.append(jsonify(ret))
228 expected = ret_all
228 expected = ret_all
229 self._compare_ok(id_, expected, given=response.body)
229 self._compare_ok(id_, expected, given=response.body)
230
230
231 def test_api_get_user(self):
231 def test_api_get_user(self):
232 id_, params = _build_data(self.apikey, 'get_user',
232 id_, params = _build_data(self.apikey, 'get_user',
233 userid=TEST_USER_ADMIN_LOGIN)
233 userid=TEST_USER_ADMIN_LOGIN)
234 response = api_call(self, params)
234 response = api_call(self, params)
235
235
236 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
236 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
237 ret = usr.get_api_data()
237 ret = usr.get_api_data()
238 ret['permissions'] = AuthUser(usr.user_id).permissions
238 ret['permissions'] = AuthUser(usr.user_id).permissions
239
239
240 expected = ret
240 expected = ret
241 self._compare_ok(id_, expected, given=response.body)
241 self._compare_ok(id_, expected, given=response.body)
242
242
243 def test_api_get_user_that_does_not_exist(self):
243 def test_api_get_user_that_does_not_exist(self):
244 id_, params = _build_data(self.apikey, 'get_user',
244 id_, params = _build_data(self.apikey, 'get_user',
245 userid='trololo')
245 userid='trololo')
246 response = api_call(self, params)
246 response = api_call(self, params)
247
247
248 expected = "user `%s` does not exist" % 'trololo'
248 expected = "user `%s` does not exist" % 'trololo'
249 self._compare_error(id_, expected, given=response.body)
249 self._compare_error(id_, expected, given=response.body)
250
250
251 def test_api_get_user_without_giving_userid(self):
251 def test_api_get_user_without_giving_userid(self):
252 id_, params = _build_data(self.apikey, 'get_user')
252 id_, params = _build_data(self.apikey, 'get_user')
253 response = api_call(self, params)
253 response = api_call(self, params)
254
254
255 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
255 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
256 ret = usr.get_api_data()
256 ret = usr.get_api_data()
257 ret['permissions'] = AuthUser(usr.user_id).permissions
257 ret['permissions'] = AuthUser(usr.user_id).permissions
258
258
259 expected = ret
259 expected = ret
260 self._compare_ok(id_, expected, given=response.body)
260 self._compare_ok(id_, expected, given=response.body)
261
261
262 def test_api_get_user_without_giving_userid_non_admin(self):
262 def test_api_get_user_without_giving_userid_non_admin(self):
263 id_, params = _build_data(self.apikey_regular, 'get_user')
263 id_, params = _build_data(self.apikey_regular, 'get_user')
264 response = api_call(self, params)
264 response = api_call(self, params)
265
265
266 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
266 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
267 ret = usr.get_api_data()
267 ret = usr.get_api_data()
268 ret['permissions'] = AuthUser(usr.user_id).permissions
268 ret['permissions'] = AuthUser(usr.user_id).permissions
269
269
270 expected = ret
270 expected = ret
271 self._compare_ok(id_, expected, given=response.body)
271 self._compare_ok(id_, expected, given=response.body)
272
272
273 def test_api_get_user_with_giving_userid_non_admin(self):
273 def test_api_get_user_with_giving_userid_non_admin(self):
274 id_, params = _build_data(self.apikey_regular, 'get_user',
274 id_, params = _build_data(self.apikey_regular, 'get_user',
275 userid=self.TEST_USER_LOGIN)
275 userid=self.TEST_USER_LOGIN)
276 response = api_call(self, params)
276 response = api_call(self, params)
277
277
278 expected = 'userid is not the same as your user'
278 expected = 'userid is not the same as your user'
279 self._compare_error(id_, expected, given=response.body)
279 self._compare_error(id_, expected, given=response.body)
280
280
281 def test_api_pull(self):
281 def test_api_pull(self):
282 repo_name = 'test_pull'
282 repo_name = 'test_pull'
283 r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
283 r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
284 r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
284 r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
285 Session.add(r)
285 Session.add(r)
286 Session.commit()
286 Session.commit()
287
287
288 id_, params = _build_data(self.apikey, 'pull',
288 id_, params = _build_data(self.apikey, 'pull',
289 repoid=repo_name,)
289 repoid=repo_name,)
290 response = api_call(self, params)
290 response = api_call(self, params)
291
291
292 expected = {'msg': 'Pulled from `%s`' % repo_name,
292 expected = {'msg': 'Pulled from `%s`' % repo_name,
293 'repository': repo_name}
293 'repository': repo_name}
294 self._compare_ok(id_, expected, given=response.body)
294 self._compare_ok(id_, expected, given=response.body)
295
295
296 fixture.destroy_repo(repo_name)
296 fixture.destroy_repo(repo_name)
297
297
298 def test_api_pull_error(self):
298 def test_api_pull_error(self):
299 id_, params = _build_data(self.apikey, 'pull',
299 id_, params = _build_data(self.apikey, 'pull',
300 repoid=self.REPO, )
300 repoid=self.REPO, )
301 response = api_call(self, params)
301 response = api_call(self, params)
302
302
303 expected = 'Unable to pull changes from `%s`' % self.REPO
303 expected = 'Unable to pull changes from `%s`' % self.REPO
304 self._compare_error(id_, expected, given=response.body)
304 self._compare_error(id_, expected, given=response.body)
305
305
306 def test_api_rescan_repos(self):
306 def test_api_rescan_repos(self):
307 id_, params = _build_data(self.apikey, 'rescan_repos')
307 id_, params = _build_data(self.apikey, 'rescan_repos')
308 response = api_call(self, params)
308 response = api_call(self, params)
309
309
310 expected = {'added': [], 'removed': []}
310 expected = {'added': [], 'removed': []}
311 self._compare_ok(id_, expected, given=response.body)
311 self._compare_ok(id_, expected, given=response.body)
312
312
313 @mock.patch.object(ScmModel, 'repo_scan', crash)
313 @mock.patch.object(ScmModel, 'repo_scan', crash)
314 def test_api_rescann_error(self):
314 def test_api_rescann_error(self):
315 id_, params = _build_data(self.apikey, 'rescan_repos', )
315 id_, params = _build_data(self.apikey, 'rescan_repos', )
316 response = api_call(self, params)
316 response = api_call(self, params)
317
317
318 expected = 'Error occurred during rescan repositories action'
318 expected = 'Error occurred during rescan repositories action'
319 self._compare_error(id_, expected, given=response.body)
319 self._compare_error(id_, expected, given=response.body)
320
320
321 def test_api_invalidate_cache(self):
321 def test_api_invalidate_cache(self):
322 repo = RepoModel().get_by_repo_name(self.REPO)
322 repo = RepoModel().get_by_repo_name(self.REPO)
323 repo.scm_instance_cached() # seed cache
323 repo.scm_instance_cached() # seed cache
324
324
325 id_, params = _build_data(self.apikey, 'invalidate_cache',
325 id_, params = _build_data(self.apikey, 'invalidate_cache',
326 repoid=self.REPO)
326 repoid=self.REPO)
327 response = api_call(self, params)
327 response = api_call(self, params)
328
328
329 expected = {
329 expected = {
330 'msg': "Cache for repository `%s` was invalidated" % (self.REPO,),
330 'msg': "Cache for repository `%s` was invalidated" % (self.REPO,),
331 'repository': self.REPO
331 'repository': self.REPO
332 }
332 }
333 self._compare_ok(id_, expected, given=response.body)
333 self._compare_ok(id_, expected, given=response.body)
334
334
335 @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
335 @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
336 def test_api_invalidate_cache_error(self):
336 def test_api_invalidate_cache_error(self):
337 id_, params = _build_data(self.apikey, 'invalidate_cache',
337 id_, params = _build_data(self.apikey, 'invalidate_cache',
338 repoid=self.REPO)
338 repoid=self.REPO)
339 response = api_call(self, params)
339 response = api_call(self, params)
340
340
341 expected = 'Error occurred during cache invalidation action'
341 expected = 'Error occurred during cache invalidation action'
342 self._compare_error(id_, expected, given=response.body)
342 self._compare_error(id_, expected, given=response.body)
343
343
344 def test_api_invalidate_cache_regular_user_no_permission(self):
344 def test_api_invalidate_cache_regular_user_no_permission(self):
345 repo = RepoModel().get_by_repo_name(self.REPO)
345 repo = RepoModel().get_by_repo_name(self.REPO)
346 repo.scm_instance_cached() # seed cache
346 repo.scm_instance_cached() # seed cache
347
347
348 id_, params = _build_data(self.apikey_regular, 'invalidate_cache',
348 id_, params = _build_data(self.apikey_regular, 'invalidate_cache',
349 repoid=self.REPO)
349 repoid=self.REPO)
350 response = api_call(self, params)
350 response = api_call(self, params)
351
351
352 expected = "repository `%s` does not exist" % (self.REPO,)
352 expected = "repository `%s` does not exist" % (self.REPO,)
353 self._compare_error(id_, expected, given=response.body)
353 self._compare_error(id_, expected, given=response.body)
354
354
355 def test_api_lock_repo_lock_aquire(self):
355 def test_api_lock_repo_lock_aquire(self):
356 id_, params = _build_data(self.apikey, 'lock',
356 id_, params = _build_data(self.apikey, 'lock',
357 userid=TEST_USER_ADMIN_LOGIN,
357 userid=TEST_USER_ADMIN_LOGIN,
358 repoid=self.REPO,
358 repoid=self.REPO,
359 locked=True)
359 locked=True)
360 response = api_call(self, params)
360 response = api_call(self, params)
361 expected = {
361 expected = {
362 'repo': self.REPO, 'locked': True,
362 'repo': self.REPO, 'locked': True,
363 'locked_since': response.json['result']['locked_since'],
363 'locked_since': response.json['result']['locked_since'],
364 'locked_by': TEST_USER_ADMIN_LOGIN,
364 'locked_by': TEST_USER_ADMIN_LOGIN,
365 'lock_state_changed': True,
365 'lock_state_changed': True,
366 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
366 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
367 % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
367 % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
368 }
368 }
369 self._compare_ok(id_, expected, given=response.body)
369 self._compare_ok(id_, expected, given=response.body)
370
370
371 def test_api_lock_repo_lock_aquire_by_non_admin(self):
371 def test_api_lock_repo_lock_aquire_by_non_admin(self):
372 repo_name = 'api_delete_me'
372 repo_name = 'api_delete_me'
373 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
373 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
374 cur_user=self.TEST_USER_LOGIN)
374 cur_user=self.TEST_USER_LOGIN)
375 try:
375 try:
376 id_, params = _build_data(self.apikey_regular, 'lock',
376 id_, params = _build_data(self.apikey_regular, 'lock',
377 repoid=repo_name,
377 repoid=repo_name,
378 locked=True)
378 locked=True)
379 response = api_call(self, params)
379 response = api_call(self, params)
380 expected = {
380 expected = {
381 'repo': repo_name,
381 'repo': repo_name,
382 'locked': True,
382 'locked': True,
383 'locked_since': response.json['result']['locked_since'],
383 'locked_since': response.json['result']['locked_since'],
384 'locked_by': self.TEST_USER_LOGIN,
384 'locked_by': self.TEST_USER_LOGIN,
385 'lock_state_changed': True,
385 'lock_state_changed': True,
386 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
386 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
387 % (self.TEST_USER_LOGIN, repo_name, True))
387 % (self.TEST_USER_LOGIN, repo_name, True))
388 }
388 }
389 self._compare_ok(id_, expected, given=response.body)
389 self._compare_ok(id_, expected, given=response.body)
390 finally:
390 finally:
391 fixture.destroy_repo(repo_name)
391 fixture.destroy_repo(repo_name)
392
392
393 def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
393 def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
394 repo_name = 'api_delete_me'
394 repo_name = 'api_delete_me'
395 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
395 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
396 cur_user=self.TEST_USER_LOGIN)
396 cur_user=self.TEST_USER_LOGIN)
397 try:
397 try:
398 id_, params = _build_data(self.apikey_regular, 'lock',
398 id_, params = _build_data(self.apikey_regular, 'lock',
399 userid=TEST_USER_ADMIN_LOGIN,
399 userid=TEST_USER_ADMIN_LOGIN,
400 repoid=repo_name,
400 repoid=repo_name,
401 locked=True)
401 locked=True)
402 response = api_call(self, params)
402 response = api_call(self, params)
403 expected = 'userid is not the same as your user'
403 expected = 'userid is not the same as your user'
404 self._compare_error(id_, expected, given=response.body)
404 self._compare_error(id_, expected, given=response.body)
405 finally:
405 finally:
406 fixture.destroy_repo(repo_name)
406 fixture.destroy_repo(repo_name)
407
407
408 def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
408 def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
409 id_, params = _build_data(self.apikey_regular, 'lock',
409 id_, params = _build_data(self.apikey_regular, 'lock',
410 repoid=self.REPO,
410 repoid=self.REPO,
411 locked=True)
411 locked=True)
412 response = api_call(self, params)
412 response = api_call(self, params)
413 expected = 'repository `%s` does not exist' % (self.REPO)
413 expected = 'repository `%s` does not exist' % (self.REPO)
414 self._compare_error(id_, expected, given=response.body)
414 self._compare_error(id_, expected, given=response.body)
415
415
416 def test_api_lock_repo_lock_release(self):
416 def test_api_lock_repo_lock_release(self):
417 id_, params = _build_data(self.apikey, 'lock',
417 id_, params = _build_data(self.apikey, 'lock',
418 userid=TEST_USER_ADMIN_LOGIN,
418 userid=TEST_USER_ADMIN_LOGIN,
419 repoid=self.REPO,
419 repoid=self.REPO,
420 locked=False)
420 locked=False)
421 response = api_call(self, params)
421 response = api_call(self, params)
422 expected = {
422 expected = {
423 'repo': self.REPO,
423 'repo': self.REPO,
424 'locked': False,
424 'locked': False,
425 'locked_since': None,
425 'locked_since': None,
426 'locked_by': TEST_USER_ADMIN_LOGIN,
426 'locked_by': TEST_USER_ADMIN_LOGIN,
427 'lock_state_changed': True,
427 'lock_state_changed': True,
428 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
428 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
429 % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
429 % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
430 }
430 }
431 self._compare_ok(id_, expected, given=response.body)
431 self._compare_ok(id_, expected, given=response.body)
432
432
433 def test_api_lock_repo_lock_aquire_optional_userid(self):
433 def test_api_lock_repo_lock_aquire_optional_userid(self):
434 id_, params = _build_data(self.apikey, 'lock',
434 id_, params = _build_data(self.apikey, 'lock',
435 repoid=self.REPO,
435 repoid=self.REPO,
436 locked=True)
436 locked=True)
437 response = api_call(self, params)
437 response = api_call(self, params)
438 time_ = response.json['result']['locked_since']
438 time_ = response.json['result']['locked_since']
439 expected = {
439 expected = {
440 'repo': self.REPO,
440 'repo': self.REPO,
441 'locked': True,
441 'locked': True,
442 'locked_since': time_,
442 'locked_since': time_,
443 'locked_by': TEST_USER_ADMIN_LOGIN,
443 'locked_by': TEST_USER_ADMIN_LOGIN,
444 'lock_state_changed': True,
444 'lock_state_changed': True,
445 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
445 'msg': ('User `%s` set lock state for repo `%s` to `%s`'
446 % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
446 % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
447 }
447 }
448
448
449 self._compare_ok(id_, expected, given=response.body)
449 self._compare_ok(id_, expected, given=response.body)
450
450
451 def test_api_lock_repo_lock_optional_locked(self):
451 def test_api_lock_repo_lock_optional_locked(self):
452 id_, params = _build_data(self.apikey, 'lock',
452 id_, params = _build_data(self.apikey, 'lock',
453 repoid=self.REPO)
453 repoid=self.REPO)
454 response = api_call(self, params)
454 response = api_call(self, params)
455 time_ = response.json['result']['locked_since']
455 time_ = response.json['result']['locked_since']
456 expected = {
456 expected = {
457 'repo': self.REPO,
457 'repo': self.REPO,
458 'locked': True,
458 'locked': True,
459 'locked_since': time_,
459 'locked_since': time_,
460 'locked_by': TEST_USER_ADMIN_LOGIN,
460 'locked_by': TEST_USER_ADMIN_LOGIN,
461 'lock_state_changed': False,
461 'lock_state_changed': False,
462 'msg': ('Repo `%s` locked by `%s` on `%s`.'
462 'msg': ('Repo `%s` locked by `%s` on `%s`.'
463 % (self.REPO, TEST_USER_ADMIN_LOGIN,
463 % (self.REPO, TEST_USER_ADMIN_LOGIN,
464 json.dumps(time_to_datetime(time_))))
464 json.dumps(time_to_datetime(time_))))
465 }
465 }
466 self._compare_ok(id_, expected, given=response.body)
466 self._compare_ok(id_, expected, given=response.body)
467
467
468 def test_api_lock_repo_lock_optional_not_locked(self):
468 def test_api_lock_repo_lock_optional_not_locked(self):
469 repo_name = 'api_not_locked'
469 repo_name = 'api_not_locked'
470 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
470 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
471 cur_user=self.TEST_USER_LOGIN)
471 cur_user=self.TEST_USER_LOGIN)
472 self.assertEqual(repo.locked, [None, None])
472 self.assertEqual(repo.locked, [None, None])
473 try:
473 try:
474 id_, params = _build_data(self.apikey, 'lock',
474 id_, params = _build_data(self.apikey, 'lock',
475 repoid=repo.repo_id)
475 repoid=repo.repo_id)
476 response = api_call(self, params)
476 response = api_call(self, params)
477 expected = {
477 expected = {
478 'repo': repo_name,
478 'repo': repo_name,
479 'locked': False,
479 'locked': False,
480 'locked_since': None,
480 'locked_since': None,
481 'locked_by': None,
481 'locked_by': None,
482 'lock_state_changed': False,
482 'lock_state_changed': False,
483 'msg': ('Repo `%s` not locked.' % (repo_name,))
483 'msg': ('Repo `%s` not locked.' % (repo_name,))
484 }
484 }
485 self._compare_ok(id_, expected, given=response.body)
485 self._compare_ok(id_, expected, given=response.body)
486 finally:
486 finally:
487 fixture.destroy_repo(repo_name)
487 fixture.destroy_repo(repo_name)
488
488
489 @mock.patch.object(Repository, 'lock', crash)
489 @mock.patch.object(Repository, 'lock', crash)
490 def test_api_lock_error(self):
490 def test_api_lock_error(self):
491 id_, params = _build_data(self.apikey, 'lock',
491 id_, params = _build_data(self.apikey, 'lock',
492 userid=TEST_USER_ADMIN_LOGIN,
492 userid=TEST_USER_ADMIN_LOGIN,
493 repoid=self.REPO,
493 repoid=self.REPO,
494 locked=True)
494 locked=True)
495 response = api_call(self, params)
495 response = api_call(self, params)
496
496
497 expected = 'Error occurred locking repository `%s`' % self.REPO
497 expected = 'Error occurred locking repository `%s`' % self.REPO
498 self._compare_error(id_, expected, given=response.body)
498 self._compare_error(id_, expected, given=response.body)
499
499
500 def test_api_get_locks_regular_user(self):
500 def test_api_get_locks_regular_user(self):
501 id_, params = _build_data(self.apikey_regular, 'get_locks')
501 id_, params = _build_data(self.apikey_regular, 'get_locks')
502 response = api_call(self, params)
502 response = api_call(self, params)
503 expected = []
503 expected = []
504 self._compare_ok(id_, expected, given=response.body)
504 self._compare_ok(id_, expected, given=response.body)
505
505
506 def test_api_get_locks_with_userid_regular_user(self):
506 def test_api_get_locks_with_userid_regular_user(self):
507 id_, params = _build_data(self.apikey_regular, 'get_locks',
507 id_, params = _build_data(self.apikey_regular, 'get_locks',
508 userid=TEST_USER_ADMIN_LOGIN)
508 userid=TEST_USER_ADMIN_LOGIN)
509 response = api_call(self, params)
509 response = api_call(self, params)
510 expected = 'userid is not the same as your user'
510 expected = 'userid is not the same as your user'
511 self._compare_error(id_, expected, given=response.body)
511 self._compare_error(id_, expected, given=response.body)
512
512
513 def test_api_get_locks(self):
513 def test_api_get_locks(self):
514 id_, params = _build_data(self.apikey, 'get_locks')
514 id_, params = _build_data(self.apikey, 'get_locks')
515 response = api_call(self, params)
515 response = api_call(self, params)
516 expected = []
516 expected = []
517 self._compare_ok(id_, expected, given=response.body)
517 self._compare_ok(id_, expected, given=response.body)
518
518
519 def test_api_get_locks_with_one_locked_repo(self):
519 def test_api_get_locks_with_one_locked_repo(self):
520 repo_name = 'api_delete_me'
520 repo_name = 'api_delete_me'
521 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
521 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
522 cur_user=self.TEST_USER_LOGIN)
522 cur_user=self.TEST_USER_LOGIN)
523 Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
523 Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
524 try:
524 try:
525 id_, params = _build_data(self.apikey, 'get_locks')
525 id_, params = _build_data(self.apikey, 'get_locks')
526 response = api_call(self, params)
526 response = api_call(self, params)
527 expected = [repo.get_api_data()]
527 expected = [repo.get_api_data()]
528 self._compare_ok(id_, expected, given=response.body)
528 self._compare_ok(id_, expected, given=response.body)
529 finally:
529 finally:
530 fixture.destroy_repo(repo_name)
530 fixture.destroy_repo(repo_name)
531
531
532 def test_api_get_locks_with_one_locked_repo_for_specific_user(self):
532 def test_api_get_locks_with_one_locked_repo_for_specific_user(self):
533 repo_name = 'api_delete_me'
533 repo_name = 'api_delete_me'
534 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
534 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
535 cur_user=self.TEST_USER_LOGIN)
535 cur_user=self.TEST_USER_LOGIN)
536 Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
536 Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
537 try:
537 try:
538 id_, params = _build_data(self.apikey, 'get_locks',
538 id_, params = _build_data(self.apikey, 'get_locks',
539 userid=self.TEST_USER_LOGIN)
539 userid=self.TEST_USER_LOGIN)
540 response = api_call(self, params)
540 response = api_call(self, params)
541 expected = [repo.get_api_data()]
541 expected = [repo.get_api_data()]
542 self._compare_ok(id_, expected, given=response.body)
542 self._compare_ok(id_, expected, given=response.body)
543 finally:
543 finally:
544 fixture.destroy_repo(repo_name)
544 fixture.destroy_repo(repo_name)
545
545
546 def test_api_get_locks_with_userid(self):
546 def test_api_get_locks_with_userid(self):
547 id_, params = _build_data(self.apikey, 'get_locks',
547 id_, params = _build_data(self.apikey, 'get_locks',
548 userid=TEST_USER_REGULAR_LOGIN)
548 userid=TEST_USER_REGULAR_LOGIN)
549 response = api_call(self, params)
549 response = api_call(self, params)
550 expected = []
550 expected = []
551 self._compare_ok(id_, expected, given=response.body)
551 self._compare_ok(id_, expected, given=response.body)
552
552
553 def test_api_create_existing_user(self):
553 def test_api_create_existing_user(self):
554 id_, params = _build_data(self.apikey, 'create_user',
554 id_, params = _build_data(self.apikey, 'create_user',
555 username=TEST_USER_ADMIN_LOGIN,
555 username=TEST_USER_ADMIN_LOGIN,
556 email='test@foo.com',
556 email='test@foo.com',
557 password='trololo')
557 password='trololo')
558 response = api_call(self, params)
558 response = api_call(self, params)
559
559
560 expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
560 expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
561 self._compare_error(id_, expected, given=response.body)
561 self._compare_error(id_, expected, given=response.body)
562
562
563 def test_api_create_user_with_existing_email(self):
563 def test_api_create_user_with_existing_email(self):
564 id_, params = _build_data(self.apikey, 'create_user',
564 id_, params = _build_data(self.apikey, 'create_user',
565 username=TEST_USER_ADMIN_LOGIN + 'new',
565 username=TEST_USER_ADMIN_LOGIN + 'new',
566 email=TEST_USER_REGULAR_EMAIL,
566 email=TEST_USER_REGULAR_EMAIL,
567 password='trololo')
567 password='trololo')
568 response = api_call(self, params)
568 response = api_call(self, params)
569
569
570 expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
570 expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
571 self._compare_error(id_, expected, given=response.body)
571 self._compare_error(id_, expected, given=response.body)
572
572
573 def test_api_create_user(self):
573 def test_api_create_user(self):
574 username = 'test_new_api_user'
574 username = 'test_new_api_user'
575 email = username + "@foo.com"
575 email = username + "@foo.com"
576
576
577 id_, params = _build_data(self.apikey, 'create_user',
577 id_, params = _build_data(self.apikey, 'create_user',
578 username=username,
578 username=username,
579 email=email,
579 email=email,
580 password='trololo')
580 password='trololo')
581 response = api_call(self, params)
581 response = api_call(self, params)
582
582
583 usr = UserModel().get_by_username(username)
583 usr = UserModel().get_by_username(username)
584 ret = dict(
584 ret = dict(
585 msg='created new user `%s`' % username,
585 msg='created new user `%s`' % username,
586 user=jsonify(usr.get_api_data())
586 user=jsonify(usr.get_api_data())
587 )
587 )
588
588
589 try:
589 try:
590 expected = ret
590 expected = ret
591 self._compare_ok(id_, expected, given=response.body)
591 self._compare_ok(id_, expected, given=response.body)
592 finally:
592 finally:
593 fixture.destroy_user(usr.user_id)
593 fixture.destroy_user(usr.user_id)
594
594
595 def test_api_create_user_without_password(self):
595 def test_api_create_user_without_password(self):
596 username = 'test_new_api_user_passwordless'
596 username = 'test_new_api_user_passwordless'
597 email = username + "@foo.com"
597 email = username + "@foo.com"
598
598
599 id_, params = _build_data(self.apikey, 'create_user',
599 id_, params = _build_data(self.apikey, 'create_user',
600 username=username,
600 username=username,
601 email=email)
601 email=email)
602 response = api_call(self, params)
602 response = api_call(self, params)
603
603
604 usr = UserModel().get_by_username(username)
604 usr = UserModel().get_by_username(username)
605 ret = dict(
605 ret = dict(
606 msg='created new user `%s`' % username,
606 msg='created new user `%s`' % username,
607 user=jsonify(usr.get_api_data())
607 user=jsonify(usr.get_api_data())
608 )
608 )
609 try:
609 try:
610 expected = ret
610 expected = ret
611 self._compare_ok(id_, expected, given=response.body)
611 self._compare_ok(id_, expected, given=response.body)
612 finally:
612 finally:
613 fixture.destroy_user(usr.user_id)
613 fixture.destroy_user(usr.user_id)
614
614
615 def test_api_create_user_with_extern_name(self):
615 def test_api_create_user_with_extern_name(self):
616 username = 'test_new_api_user_passwordless'
616 username = 'test_new_api_user_passwordless'
617 email = username + "@foo.com"
617 email = username + "@foo.com"
618
618
619 id_, params = _build_data(self.apikey, 'create_user',
619 id_, params = _build_data(self.apikey, 'create_user',
620 username=username,
620 username=username,
621 email=email, extern_name='internal')
621 email=email, extern_name='internal')
622 response = api_call(self, params)
622 response = api_call(self, params)
623
623
624 usr = UserModel().get_by_username(username)
624 usr = UserModel().get_by_username(username)
625 ret = dict(
625 ret = dict(
626 msg='created new user `%s`' % username,
626 msg='created new user `%s`' % username,
627 user=jsonify(usr.get_api_data())
627 user=jsonify(usr.get_api_data())
628 )
628 )
629 try:
629 try:
630 expected = ret
630 expected = ret
631 self._compare_ok(id_, expected, given=response.body)
631 self._compare_ok(id_, expected, given=response.body)
632 finally:
632 finally:
633 fixture.destroy_user(usr.user_id)
633 fixture.destroy_user(usr.user_id)
634
634
635 @mock.patch.object(UserModel, 'create_or_update', crash)
635 @mock.patch.object(UserModel, 'create_or_update', crash)
636 def test_api_create_user_when_exception_happened(self):
636 def test_api_create_user_when_exception_happened(self):
637
637
638 username = 'test_new_api_user'
638 username = 'test_new_api_user'
639 email = username + "@foo.com"
639 email = username + "@foo.com"
640
640
641 id_, params = _build_data(self.apikey, 'create_user',
641 id_, params = _build_data(self.apikey, 'create_user',
642 username=username,
642 username=username,
643 email=email,
643 email=email,
644 password='trololo')
644 password='trololo')
645 response = api_call(self, params)
645 response = api_call(self, params)
646 expected = 'failed to create user `%s`' % username
646 expected = 'failed to create user `%s`' % username
647 self._compare_error(id_, expected, given=response.body)
647 self._compare_error(id_, expected, given=response.body)
648
648
649 def test_api_delete_user(self):
649 def test_api_delete_user(self):
650 usr = UserModel().create_or_update(username=u'test_user',
650 usr = UserModel().create_or_update(username=u'test_user',
651 password=u'qweqwe',
651 password=u'qweqwe',
652 email=u'u232@example.com',
652 email=u'u232@example.com',
653 firstname=u'u1', lastname=u'u1')
653 firstname=u'u1', lastname=u'u1')
654 Session().commit()
654 Session().commit()
655 username = usr.username
655 username = usr.username
656 email = usr.email
656 email = usr.email
657 usr_id = usr.user_id
657 usr_id = usr.user_id
658 ## DELETE THIS USER NOW
658 ## DELETE THIS USER NOW
659
659
660 id_, params = _build_data(self.apikey, 'delete_user',
660 id_, params = _build_data(self.apikey, 'delete_user',
661 userid=username, )
661 userid=username, )
662 response = api_call(self, params)
662 response = api_call(self, params)
663
663
664 ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
664 ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
665 'user': None}
665 'user': None}
666 expected = ret
666 expected = ret
667 self._compare_ok(id_, expected, given=response.body)
667 self._compare_ok(id_, expected, given=response.body)
668
668
669 @mock.patch.object(UserModel, 'delete', crash)
669 @mock.patch.object(UserModel, 'delete', crash)
670 def test_api_delete_user_when_exception_happened(self):
670 def test_api_delete_user_when_exception_happened(self):
671 usr = UserModel().create_or_update(username=u'test_user',
671 usr = UserModel().create_or_update(username=u'test_user',
672 password=u'qweqwe',
672 password=u'qweqwe',
673 email=u'u232@example.com',
673 email=u'u232@example.com',
674 firstname=u'u1', lastname=u'u1')
674 firstname=u'u1', lastname=u'u1')
675 Session().commit()
675 Session().commit()
676 username = usr.username
676 username = usr.username
677
677
678 id_, params = _build_data(self.apikey, 'delete_user',
678 id_, params = _build_data(self.apikey, 'delete_user',
679 userid=username, )
679 userid=username, )
680 response = api_call(self, params)
680 response = api_call(self, params)
681 ret = 'failed to delete user ID:%s %s' % (usr.user_id,
681 ret = 'failed to delete user ID:%s %s' % (usr.user_id,
682 usr.username)
682 usr.username)
683 expected = ret
683 expected = ret
684 self._compare_error(id_, expected, given=response.body)
684 self._compare_error(id_, expected, given=response.body)
685
685
686 @parameterized.expand([('firstname', 'new_username'),
686 @parameterized.expand([('firstname', 'new_username'),
687 ('lastname', 'new_username'),
687 ('lastname', 'new_username'),
688 ('email', 'new_username'),
688 ('email', 'new_username'),
689 ('admin', True),
689 ('admin', True),
690 ('admin', False),
690 ('admin', False),
691 ('extern_type', 'ldap'),
691 ('extern_type', 'ldap'),
692 ('extern_type', None),
692 ('extern_type', None),
693 ('extern_name', 'test'),
693 ('extern_name', 'test'),
694 ('extern_name', None),
694 ('extern_name', None),
695 ('active', False),
695 ('active', False),
696 ('active', True),
696 ('active', True),
697 ('password', 'newpass')
697 ('password', 'newpass')
698 ])
698 ])
699 def test_api_update_user(self, name, expected):
699 def test_api_update_user(self, name, expected):
700 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
700 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
701 kw = {name: expected,
701 kw = {name: expected,
702 'userid': usr.user_id}
702 'userid': usr.user_id}
703 id_, params = _build_data(self.apikey, 'update_user', **kw)
703 id_, params = _build_data(self.apikey, 'update_user', **kw)
704 response = api_call(self, params)
704 response = api_call(self, params)
705
705
706 ret = {
706 ret = {
707 'msg': 'updated user ID:%s %s' % (
707 'msg': 'updated user ID:%s %s' % (
708 usr.user_id, self.TEST_USER_LOGIN),
708 usr.user_id, self.TEST_USER_LOGIN),
709 'user': jsonify(UserModel() \
709 'user': jsonify(UserModel() \
710 .get_by_username(self.TEST_USER_LOGIN) \
710 .get_by_username(self.TEST_USER_LOGIN) \
711 .get_api_data())
711 .get_api_data())
712 }
712 }
713
713
714 expected = ret
714 expected = ret
715 self._compare_ok(id_, expected, given=response.body)
715 self._compare_ok(id_, expected, given=response.body)
716
716
717 def test_api_update_user_no_changed_params(self):
717 def test_api_update_user_no_changed_params(self):
718 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
718 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
719 ret = jsonify(usr.get_api_data())
719 ret = jsonify(usr.get_api_data())
720 id_, params = _build_data(self.apikey, 'update_user',
720 id_, params = _build_data(self.apikey, 'update_user',
721 userid=TEST_USER_ADMIN_LOGIN)
721 userid=TEST_USER_ADMIN_LOGIN)
722
722
723 response = api_call(self, params)
723 response = api_call(self, params)
724 ret = {
724 ret = {
725 'msg': 'updated user ID:%s %s' % (
725 'msg': 'updated user ID:%s %s' % (
726 usr.user_id, TEST_USER_ADMIN_LOGIN),
726 usr.user_id, TEST_USER_ADMIN_LOGIN),
727 'user': ret
727 'user': ret
728 }
728 }
729 expected = ret
729 expected = ret
730 self._compare_ok(id_, expected, given=response.body)
730 self._compare_ok(id_, expected, given=response.body)
731
731
732 def test_api_update_user_by_user_id(self):
732 def test_api_update_user_by_user_id(self):
733 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
733 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
734 ret = jsonify(usr.get_api_data())
734 ret = jsonify(usr.get_api_data())
735 id_, params = _build_data(self.apikey, 'update_user',
735 id_, params = _build_data(self.apikey, 'update_user',
736 userid=usr.user_id)
736 userid=usr.user_id)
737
737
738 response = api_call(self, params)
738 response = api_call(self, params)
739 ret = {
739 ret = {
740 'msg': 'updated user ID:%s %s' % (
740 'msg': 'updated user ID:%s %s' % (
741 usr.user_id, TEST_USER_ADMIN_LOGIN),
741 usr.user_id, TEST_USER_ADMIN_LOGIN),
742 'user': ret
742 'user': ret
743 }
743 }
744 expected = ret
744 expected = ret
745 self._compare_ok(id_, expected, given=response.body)
745 self._compare_ok(id_, expected, given=response.body)
746
746
747 def test_api_update_user_default_user(self):
747 def test_api_update_user_default_user(self):
748 usr = User.get_default_user()
748 usr = User.get_default_user()
749 id_, params = _build_data(self.apikey, 'update_user',
749 id_, params = _build_data(self.apikey, 'update_user',
750 userid=usr.user_id)
750 userid=usr.user_id)
751
751
752 response = api_call(self, params)
752 response = api_call(self, params)
753 expected = 'editing default user is forbidden'
753 expected = 'editing default user is forbidden'
754 self._compare_error(id_, expected, given=response.body)
754 self._compare_error(id_, expected, given=response.body)
755
755
756 @mock.patch.object(UserModel, 'update_user', crash)
756 @mock.patch.object(UserModel, 'update_user', crash)
757 def test_api_update_user_when_exception_happens(self):
757 def test_api_update_user_when_exception_happens(self):
758 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
758 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
759 ret = jsonify(usr.get_api_data())
759 ret = jsonify(usr.get_api_data())
760 id_, params = _build_data(self.apikey, 'update_user',
760 id_, params = _build_data(self.apikey, 'update_user',
761 userid=usr.user_id)
761 userid=usr.user_id)
762
762
763 response = api_call(self, params)
763 response = api_call(self, params)
764 ret = 'failed to update user `%s`' % usr.user_id
764 ret = 'failed to update user `%s`' % usr.user_id
765
765
766 expected = ret
766 expected = ret
767 self._compare_error(id_, expected, given=response.body)
767 self._compare_error(id_, expected, given=response.body)
768
768
769 def test_api_get_repo(self):
769 def test_api_get_repo(self):
770 new_group = 'some_new_group'
770 new_group = 'some_new_group'
771 make_user_group(new_group)
771 make_user_group(new_group)
772 RepoModel().grant_user_group_permission(repo=self.REPO,
772 RepoModel().grant_user_group_permission(repo=self.REPO,
773 group_name=new_group,
773 group_name=new_group,
774 perm='repository.read')
774 perm='repository.read')
775 Session().commit()
775 Session().commit()
776 id_, params = _build_data(self.apikey, 'get_repo',
776 id_, params = _build_data(self.apikey, 'get_repo',
777 repoid=self.REPO)
777 repoid=self.REPO)
778 response = api_call(self, params)
778 response = api_call(self, params)
779
779
780 repo = RepoModel().get_by_repo_name(self.REPO)
780 repo = RepoModel().get_by_repo_name(self.REPO)
781 ret = repo.get_api_data()
781 ret = repo.get_api_data()
782
782
783 members = []
783 members = []
784 followers = []
784 followers = []
785 for user in repo.repo_to_perm:
785 for user in repo.repo_to_perm:
786 perm = user.permission.permission_name
786 perm = user.permission.permission_name
787 user = user.user
787 user = user.user
788 user_data = {'name': user.username, 'type': "user",
788 user_data = {'name': user.username, 'type': "user",
789 'permission': perm}
789 'permission': perm}
790 members.append(user_data)
790 members.append(user_data)
791
791
792 for user_group in repo.users_group_to_perm:
792 for user_group in repo.users_group_to_perm:
793 perm = user_group.permission.permission_name
793 perm = user_group.permission.permission_name
794 user_group = user_group.users_group
794 user_group = user_group.users_group
795 user_group_data = {'name': user_group.users_group_name,
795 user_group_data = {'name': user_group.users_group_name,
796 'type': "user_group", 'permission': perm}
796 'type': "user_group", 'permission': perm}
797 members.append(user_group_data)
797 members.append(user_group_data)
798
798
799 for user in repo.followers:
799 for user in repo.followers:
800 followers.append(user.user.get_api_data())
800 followers.append(user.user.get_api_data())
801
801
802 ret['members'] = members
802 ret['members'] = members
803 ret['followers'] = followers
803 ret['followers'] = followers
804
804
805 expected = ret
805 expected = ret
806 self._compare_ok(id_, expected, given=response.body)
806 self._compare_ok(id_, expected, given=response.body)
807 fixture.destroy_user_group(new_group)
807 fixture.destroy_user_group(new_group)
808
808
809 @parameterized.expand([
809 @parameterized.expand([
810 ('repository.admin',),
810 ('repository.admin',),
811 ('repository.write',),
811 ('repository.write',),
812 ('repository.read',),
812 ('repository.read',),
813 ])
813 ])
814 def test_api_get_repo_by_non_admin(self, grant_perm):
814 def test_api_get_repo_by_non_admin(self, grant_perm):
815 RepoModel().grant_user_permission(repo=self.REPO,
815 RepoModel().grant_user_permission(repo=self.REPO,
816 user=self.TEST_USER_LOGIN,
816 user=self.TEST_USER_LOGIN,
817 perm=grant_perm)
817 perm=grant_perm)
818 Session().commit()
818 Session().commit()
819 id_, params = _build_data(self.apikey_regular, 'get_repo',
819 id_, params = _build_data(self.apikey_regular, 'get_repo',
820 repoid=self.REPO)
820 repoid=self.REPO)
821 response = api_call(self, params)
821 response = api_call(self, params)
822
822
823 repo = RepoModel().get_by_repo_name(self.REPO)
823 repo = RepoModel().get_by_repo_name(self.REPO)
824 ret = repo.get_api_data()
824 ret = repo.get_api_data()
825
825
826 members = []
826 members = []
827 followers = []
827 followers = []
828 self.assertEqual(2, len(repo.repo_to_perm))
828 self.assertEqual(2, len(repo.repo_to_perm))
829 for user in repo.repo_to_perm:
829 for user in repo.repo_to_perm:
830 perm = user.permission.permission_name
830 perm = user.permission.permission_name
831 user_obj = user.user
831 user_obj = user.user
832 user_data = {'name': user_obj.username, 'type': "user",
832 user_data = {'name': user_obj.username, 'type': "user",
833 'permission': perm}
833 'permission': perm}
834 members.append(user_data)
834 members.append(user_data)
835
835
836 for user_group in repo.users_group_to_perm:
836 for user_group in repo.users_group_to_perm:
837 perm = user_group.permission.permission_name
837 perm = user_group.permission.permission_name
838 user_group_obj = user_group.users_group
838 user_group_obj = user_group.users_group
839 user_group_data = {'name': user_group_obj.users_group_name,
839 user_group_data = {'name': user_group_obj.users_group_name,
840 'type': "user_group", 'permission': perm}
840 'type': "user_group", 'permission': perm}
841 members.append(user_group_data)
841 members.append(user_group_data)
842
842
843 for user in repo.followers:
843 for user in repo.followers:
844 followers.append(user.user.get_api_data())
844 followers.append(user.user.get_api_data())
845
845
846 ret['members'] = members
846 ret['members'] = members
847 ret['followers'] = followers
847 ret['followers'] = followers
848
848
849 expected = ret
849 expected = ret
850 try:
850 try:
851 self._compare_ok(id_, expected, given=response.body)
851 self._compare_ok(id_, expected, given=response.body)
852 finally:
852 finally:
853 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
853 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
854
854
855 def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
855 def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
856 RepoModel().grant_user_permission(repo=self.REPO,
856 RepoModel().grant_user_permission(repo=self.REPO,
857 user=self.TEST_USER_LOGIN,
857 user=self.TEST_USER_LOGIN,
858 perm='repository.none')
858 perm='repository.none')
859
859
860 id_, params = _build_data(self.apikey_regular, 'get_repo',
860 id_, params = _build_data(self.apikey_regular, 'get_repo',
861 repoid=self.REPO)
861 repoid=self.REPO)
862 response = api_call(self, params)
862 response = api_call(self, params)
863
863
864 expected = 'repository `%s` does not exist' % (self.REPO)
864 expected = 'repository `%s` does not exist' % (self.REPO)
865 self._compare_error(id_, expected, given=response.body)
865 self._compare_error(id_, expected, given=response.body)
866
866
867 def test_api_get_repo_that_doesn_not_exist(self):
867 def test_api_get_repo_that_doesn_not_exist(self):
868 id_, params = _build_data(self.apikey, 'get_repo',
868 id_, params = _build_data(self.apikey, 'get_repo',
869 repoid='no-such-repo')
869 repoid='no-such-repo')
870 response = api_call(self, params)
870 response = api_call(self, params)
871
871
872 ret = 'repository `%s` does not exist' % 'no-such-repo'
872 ret = 'repository `%s` does not exist' % 'no-such-repo'
873 expected = ret
873 expected = ret
874 self._compare_error(id_, expected, given=response.body)
874 self._compare_error(id_, expected, given=response.body)
875
875
876 def test_api_get_repos(self):
876 def test_api_get_repos(self):
877 id_, params = _build_data(self.apikey, 'get_repos')
877 id_, params = _build_data(self.apikey, 'get_repos')
878 response = api_call(self, params)
878 response = api_call(self, params)
879
879
880 result = []
880 result = []
881 for repo in RepoModel().get_all():
881 for repo in RepoModel().get_all():
882 result.append(repo.get_api_data())
882 result.append(repo.get_api_data())
883 ret = jsonify(result)
883 ret = jsonify(result)
884
884
885 expected = ret
885 expected = ret
886 self._compare_ok(id_, expected, given=response.body)
886 self._compare_ok(id_, expected, given=response.body)
887
887
888 def test_api_get_repos_non_admin(self):
888 def test_api_get_repos_non_admin(self):
889 id_, params = _build_data(self.apikey_regular, 'get_repos')
889 id_, params = _build_data(self.apikey_regular, 'get_repos')
890 response = api_call(self, params)
890 response = api_call(self, params)
891
891
892 result = []
892 result = []
893 for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
893 for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
894 result.append(repo.get_api_data())
894 result.append(repo.get_api_data())
895 ret = jsonify(result)
895 ret = jsonify(result)
896
896
897 expected = ret
897 expected = ret
898 self._compare_ok(id_, expected, given=response.body)
898 self._compare_ok(id_, expected, given=response.body)
899
899
900 @parameterized.expand([('all', 'all'),
900 @parameterized.expand([('all', 'all'),
901 ('dirs', 'dirs'),
901 ('dirs', 'dirs'),
902 ('files', 'files'), ])
902 ('files', 'files'), ])
903 def test_api_get_repo_nodes(self, name, ret_type):
903 def test_api_get_repo_nodes(self, name, ret_type):
904 rev = 'tip'
904 rev = 'tip'
905 path = '/'
905 path = '/'
906 id_, params = _build_data(self.apikey, 'get_repo_nodes',
906 id_, params = _build_data(self.apikey, 'get_repo_nodes',
907 repoid=self.REPO, revision=rev,
907 repoid=self.REPO, revision=rev,
908 root_path=path,
908 root_path=path,
909 ret_type=ret_type)
909 ret_type=ret_type)
910 response = api_call(self, params)
910 response = api_call(self, params)
911
911
912 # we don't the actual return types here since it's tested somewhere
912 # we don't the actual return types here since it's tested somewhere
913 # else
913 # else
914 expected = response.json['result']
914 expected = response.json['result']
915 self._compare_ok(id_, expected, given=response.body)
915 self._compare_ok(id_, expected, given=response.body)
916
916
917 def test_api_get_repo_nodes_bad_revisions(self):
917 def test_api_get_repo_nodes_bad_revisions(self):
918 rev = 'i-dont-exist'
918 rev = 'i-dont-exist'
919 path = '/'
919 path = '/'
920 id_, params = _build_data(self.apikey, 'get_repo_nodes',
920 id_, params = _build_data(self.apikey, 'get_repo_nodes',
921 repoid=self.REPO, revision=rev,
921 repoid=self.REPO, revision=rev,
922 root_path=path, )
922 root_path=path, )
923 response = api_call(self, params)
923 response = api_call(self, params)
924
924
925 expected = 'failed to get repo: `%s` nodes' % self.REPO
925 expected = 'failed to get repo: `%s` nodes' % self.REPO
926 self._compare_error(id_, expected, given=response.body)
926 self._compare_error(id_, expected, given=response.body)
927
927
928 def test_api_get_repo_nodes_bad_path(self):
928 def test_api_get_repo_nodes_bad_path(self):
929 rev = 'tip'
929 rev = 'tip'
930 path = '/idontexits'
930 path = '/idontexits'
931 id_, params = _build_data(self.apikey, 'get_repo_nodes',
931 id_, params = _build_data(self.apikey, 'get_repo_nodes',
932 repoid=self.REPO, revision=rev,
932 repoid=self.REPO, revision=rev,
933 root_path=path, )
933 root_path=path, )
934 response = api_call(self, params)
934 response = api_call(self, params)
935
935
936 expected = 'failed to get repo: `%s` nodes' % self.REPO
936 expected = 'failed to get repo: `%s` nodes' % self.REPO
937 self._compare_error(id_, expected, given=response.body)
937 self._compare_error(id_, expected, given=response.body)
938
938
939 def test_api_get_repo_nodes_bad_ret_type(self):
939 def test_api_get_repo_nodes_bad_ret_type(self):
940 rev = 'tip'
940 rev = 'tip'
941 path = '/'
941 path = '/'
942 ret_type = 'error'
942 ret_type = 'error'
943 id_, params = _build_data(self.apikey, 'get_repo_nodes',
943 id_, params = _build_data(self.apikey, 'get_repo_nodes',
944 repoid=self.REPO, revision=rev,
944 repoid=self.REPO, revision=rev,
945 root_path=path,
945 root_path=path,
946 ret_type=ret_type)
946 ret_type=ret_type)
947 response = api_call(self, params)
947 response = api_call(self, params)
948
948
949 expected = ('ret_type must be one of %s'
949 expected = ('ret_type must be one of %s'
950 % (','.join(['files', 'dirs', 'all'])))
950 % (','.join(['files', 'dirs', 'all'])))
951 self._compare_error(id_, expected, given=response.body)
951 self._compare_error(id_, expected, given=response.body)
952
952
953 @parameterized.expand([('all', 'all', 'repository.write'),
953 @parameterized.expand([('all', 'all', 'repository.write'),
954 ('dirs', 'dirs', 'repository.admin'),
954 ('dirs', 'dirs', 'repository.admin'),
955 ('files', 'files', 'repository.read'), ])
955 ('files', 'files', 'repository.read'), ])
956 def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
956 def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
957 RepoModel().grant_user_permission(repo=self.REPO,
957 RepoModel().grant_user_permission(repo=self.REPO,
958 user=self.TEST_USER_LOGIN,
958 user=self.TEST_USER_LOGIN,
959 perm=grant_perm)
959 perm=grant_perm)
960 Session().commit()
960 Session().commit()
961
961
962 rev = 'tip'
962 rev = 'tip'
963 path = '/'
963 path = '/'
964 id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
964 id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
965 repoid=self.REPO, revision=rev,
965 repoid=self.REPO, revision=rev,
966 root_path=path,
966 root_path=path,
967 ret_type=ret_type)
967 ret_type=ret_type)
968 response = api_call(self, params)
968 response = api_call(self, params)
969
969
970 # we don't the actual return types here since it's tested somewhere
970 # we don't the actual return types here since it's tested somewhere
971 # else
971 # else
972 expected = response.json['result']
972 expected = response.json['result']
973 try:
973 try:
974 self._compare_ok(id_, expected, given=response.body)
974 self._compare_ok(id_, expected, given=response.body)
975 finally:
975 finally:
976 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
976 RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
977
977
978 def test_api_create_repo(self):
978 def test_api_create_repo(self):
979 repo_name = 'api-repo'
979 repo_name = 'api-repo'
980 id_, params = _build_data(self.apikey, 'create_repo',
980 id_, params = _build_data(self.apikey, 'create_repo',
981 repo_name=repo_name,
981 repo_name=repo_name,
982 owner=TEST_USER_ADMIN_LOGIN,
982 owner=TEST_USER_ADMIN_LOGIN,
983 repo_type=self.REPO_TYPE,
983 repo_type=self.REPO_TYPE,
984 )
984 )
985 response = api_call(self, params)
985 response = api_call(self, params)
986
986
987 repo = RepoModel().get_by_repo_name(repo_name)
987 repo = RepoModel().get_by_repo_name(repo_name)
988 self.assertNotEqual(repo, None)
988 self.assertNotEqual(repo, None)
989 ret = {
989 ret = {
990 'msg': 'Created new repository `%s`' % repo_name,
990 'msg': 'Created new repository `%s`' % repo_name,
991 'success': True,
991 'success': True,
992 'task': None,
992 'task': None,
993 }
993 }
994 expected = ret
994 expected = ret
995 self._compare_ok(id_, expected, given=response.body)
995 self._compare_ok(id_, expected, given=response.body)
996 fixture.destroy_repo(repo_name)
996 fixture.destroy_repo(repo_name)
997
997
998 def test_api_create_repo_in_group(self):
998 def test_api_create_repo_in_group(self):
999 repo_name = 'my_gr/api-repo'
999 repo_name = 'my_gr/api-repo'
1000 id_, params = _build_data(self.apikey, 'create_repo',
1000 id_, params = _build_data(self.apikey, 'create_repo',
1001 repo_name=repo_name,
1001 repo_name=repo_name,
1002 owner=TEST_USER_ADMIN_LOGIN,
1002 owner=TEST_USER_ADMIN_LOGIN,
1003 repo_type=self.REPO_TYPE,)
1003 repo_type=self.REPO_TYPE,)
1004 response = api_call(self, params)
1004 response = api_call(self, params)
1005 print params
1005 print params
1006 repo = RepoModel().get_by_repo_name(repo_name)
1006 repo = RepoModel().get_by_repo_name(repo_name)
1007 self.assertNotEqual(repo, None)
1007 self.assertNotEqual(repo, None)
1008 ret = {
1008 ret = {
1009 'msg': 'Created new repository `%s`' % repo_name,
1009 'msg': 'Created new repository `%s`' % repo_name,
1010 'success': True,
1010 'success': True,
1011 'task': None,
1011 'task': None,
1012 }
1012 }
1013 expected = ret
1013 expected = ret
1014 self._compare_ok(id_, expected, given=response.body)
1014 self._compare_ok(id_, expected, given=response.body)
1015 fixture.destroy_repo(repo_name)
1015 fixture.destroy_repo(repo_name)
1016 fixture.destroy_repo_group('my_gr')
1016 fixture.destroy_repo_group('my_gr')
1017
1017
1018 def test_api_create_repo_unknown_owner(self):
1018 def test_api_create_repo_unknown_owner(self):
1019 repo_name = 'api-repo'
1019 repo_name = 'api-repo'
1020 owner = 'i-dont-exist'
1020 owner = 'i-dont-exist'
1021 id_, params = _build_data(self.apikey, 'create_repo',
1021 id_, params = _build_data(self.apikey, 'create_repo',
1022 repo_name=repo_name,
1022 repo_name=repo_name,
1023 owner=owner,
1023 owner=owner,
1024 repo_type=self.REPO_TYPE,
1024 repo_type=self.REPO_TYPE,
1025 )
1025 )
1026 response = api_call(self, params)
1026 response = api_call(self, params)
1027 expected = 'user `%s` does not exist' % owner
1027 expected = 'user `%s` does not exist' % owner
1028 self._compare_error(id_, expected, given=response.body)
1028 self._compare_error(id_, expected, given=response.body)
1029
1029
1030 def test_api_create_repo_dont_specify_owner(self):
1030 def test_api_create_repo_dont_specify_owner(self):
1031 repo_name = 'api-repo'
1031 repo_name = 'api-repo'
1032 owner = 'i-dont-exist'
1032 owner = 'i-dont-exist'
1033 id_, params = _build_data(self.apikey, 'create_repo',
1033 id_, params = _build_data(self.apikey, 'create_repo',
1034 repo_name=repo_name,
1034 repo_name=repo_name,
1035 repo_type=self.REPO_TYPE,
1035 repo_type=self.REPO_TYPE,
1036 )
1036 )
1037 response = api_call(self, params)
1037 response = api_call(self, params)
1038
1038
1039 repo = RepoModel().get_by_repo_name(repo_name)
1039 repo = RepoModel().get_by_repo_name(repo_name)
1040 self.assertNotEqual(repo, None)
1040 self.assertNotEqual(repo, None)
1041 ret = {
1041 ret = {
1042 'msg': 'Created new repository `%s`' % repo_name,
1042 'msg': 'Created new repository `%s`' % repo_name,
1043 'success': True,
1043 'success': True,
1044 'task': None,
1044 'task': None,
1045 }
1045 }
1046 expected = ret
1046 expected = ret
1047 self._compare_ok(id_, expected, given=response.body)
1047 self._compare_ok(id_, expected, given=response.body)
1048 fixture.destroy_repo(repo_name)
1048 fixture.destroy_repo(repo_name)
1049
1049
1050 def test_api_create_repo_by_non_admin(self):
1050 def test_api_create_repo_by_non_admin(self):
1051 repo_name = 'api-repo'
1051 repo_name = 'api-repo'
1052 owner = 'i-dont-exist'
1052 owner = 'i-dont-exist'
1053 id_, params = _build_data(self.apikey_regular, 'create_repo',
1053 id_, params = _build_data(self.apikey_regular, 'create_repo',
1054 repo_name=repo_name,
1054 repo_name=repo_name,
1055 repo_type=self.REPO_TYPE,
1055 repo_type=self.REPO_TYPE,
1056 )
1056 )
1057 response = api_call(self, params)
1057 response = api_call(self, params)
1058
1058
1059 repo = RepoModel().get_by_repo_name(repo_name)
1059 repo = RepoModel().get_by_repo_name(repo_name)
1060 self.assertNotEqual(repo, None)
1060 self.assertNotEqual(repo, None)
1061 ret = {
1061 ret = {
1062 'msg': 'Created new repository `%s`' % repo_name,
1062 'msg': 'Created new repository `%s`' % repo_name,
1063 'success': True,
1063 'success': True,
1064 'task': None,
1064 'task': None,
1065 }
1065 }
1066 expected = ret
1066 expected = ret
1067 self._compare_ok(id_, expected, given=response.body)
1067 self._compare_ok(id_, expected, given=response.body)
1068 fixture.destroy_repo(repo_name)
1068 fixture.destroy_repo(repo_name)
1069
1069
1070 def test_api_create_repo_by_non_admin_specify_owner(self):
1070 def test_api_create_repo_by_non_admin_specify_owner(self):
1071 repo_name = 'api-repo'
1071 repo_name = 'api-repo'
1072 owner = 'i-dont-exist'
1072 owner = 'i-dont-exist'
1073 id_, params = _build_data(self.apikey_regular, 'create_repo',
1073 id_, params = _build_data(self.apikey_regular, 'create_repo',
1074 repo_name=repo_name,
1074 repo_name=repo_name,
1075 repo_type=self.REPO_TYPE,
1075 repo_type=self.REPO_TYPE,
1076 owner=owner)
1076 owner=owner)
1077 response = api_call(self, params)
1077 response = api_call(self, params)
1078
1078
1079 expected = 'Only Kallithea admin can specify `owner` param'
1079 expected = 'Only Kallithea admin can specify `owner` param'
1080 self._compare_error(id_, expected, given=response.body)
1080 self._compare_error(id_, expected, given=response.body)
1081 fixture.destroy_repo(repo_name)
1081 fixture.destroy_repo(repo_name)
1082
1082
1083 def test_api_create_repo_exists(self):
1083 def test_api_create_repo_exists(self):
1084 repo_name = self.REPO
1084 repo_name = self.REPO
1085 id_, params = _build_data(self.apikey, 'create_repo',
1085 id_, params = _build_data(self.apikey, 'create_repo',
1086 repo_name=repo_name,
1086 repo_name=repo_name,
1087 owner=TEST_USER_ADMIN_LOGIN,
1087 owner=TEST_USER_ADMIN_LOGIN,
1088 repo_type=self.REPO_TYPE,)
1088 repo_type=self.REPO_TYPE,)
1089 response = api_call(self, params)
1089 response = api_call(self, params)
1090 expected = "repo `%s` already exist" % repo_name
1090 expected = "repo `%s` already exist" % repo_name
1091 self._compare_error(id_, expected, given=response.body)
1091 self._compare_error(id_, expected, given=response.body)
1092
1092
1093 @mock.patch.object(RepoModel, 'create', crash)
1093 @mock.patch.object(RepoModel, 'create', crash)
1094 def test_api_create_repo_exception_occurred(self):
1094 def test_api_create_repo_exception_occurred(self):
1095 repo_name = 'api-repo'
1095 repo_name = 'api-repo'
1096 id_, params = _build_data(self.apikey, 'create_repo',
1096 id_, params = _build_data(self.apikey, 'create_repo',
1097 repo_name=repo_name,
1097 repo_name=repo_name,
1098 owner=TEST_USER_ADMIN_LOGIN,
1098 owner=TEST_USER_ADMIN_LOGIN,
1099 repo_type=self.REPO_TYPE,)
1099 repo_type=self.REPO_TYPE,)
1100 response = api_call(self, params)
1100 response = api_call(self, params)
1101 expected = 'failed to create repository `%s`' % repo_name
1101 expected = 'failed to create repository `%s`' % repo_name
1102 self._compare_error(id_, expected, given=response.body)
1102 self._compare_error(id_, expected, given=response.body)
1103
1103
1104 @parameterized.expand([
1104 @parameterized.expand([
1105 ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
1105 ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
1106 ('description', {'description': 'new description'}),
1106 ('description', {'description': 'new description'}),
1107 ('active', {'active': True}),
1107 ('active', {'active': True}),
1108 ('active', {'active': False}),
1108 ('active', {'active': False}),
1109 ('clone_uri', {'clone_uri': 'http://foo.com/repo'}),
1109 ('clone_uri', {'clone_uri': 'http://foo.com/repo'}),
1110 ('clone_uri', {'clone_uri': None}),
1110 ('clone_uri', {'clone_uri': None}),
1111 ('landing_rev', {'landing_rev': 'branch:master'}),
1111 ('landing_rev', {'landing_rev': 'branch:master'}),
1112 ('enable_statistics', {'enable_statistics': True}),
1112 ('enable_statistics', {'enable_statistics': True}),
1113 ('enable_locking', {'enable_locking': True}),
1113 ('enable_locking', {'enable_locking': True}),
1114 ('enable_downloads', {'enable_downloads': True}),
1114 ('enable_downloads', {'enable_downloads': True}),
1115 ('name', {'name': 'new_repo_name'}),
1115 ('name', {'name': 'new_repo_name'}),
1116 ('repo_group', {'group': 'test_group_for_update'}),
1116 ('repo_group', {'group': 'test_group_for_update'}),
1117 ])
1117 ])
1118 def test_api_update_repo(self, changing_attr, updates):
1118 def test_api_update_repo(self, changing_attr, updates):
1119 repo_name = 'api_update_me'
1119 repo_name = 'api_update_me'
1120 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1120 repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1121 if changing_attr == 'repo_group':
1121 if changing_attr == 'repo_group':
1122 fixture.create_repo_group(updates['group'])
1122 fixture.create_repo_group(updates['group'])
1123
1123
1124 id_, params = _build_data(self.apikey, 'update_repo',
1124 id_, params = _build_data(self.apikey, 'update_repo',
1125 repoid=repo_name, **updates)
1125 repoid=repo_name, **updates)
1126 response = api_call(self, params)
1126 response = api_call(self, params)
1127 if changing_attr == 'name':
1127 if changing_attr == 'name':
1128 repo_name = updates['name']
1128 repo_name = updates['name']
1129 if changing_attr == 'repo_group':
1129 if changing_attr == 'repo_group':
1130 repo_name = '/'.join([updates['group'], repo_name])
1130 repo_name = '/'.join([updates['group'], repo_name])
1131 try:
1131 try:
1132 expected = {
1132 expected = {
1133 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
1133 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
1134 'repository': repo.get_api_data()
1134 'repository': repo.get_api_data()
1135 }
1135 }
1136 self._compare_ok(id_, expected, given=response.body)
1136 self._compare_ok(id_, expected, given=response.body)
1137 finally:
1137 finally:
1138 fixture.destroy_repo(repo_name)
1138 fixture.destroy_repo(repo_name)
1139 if changing_attr == 'repo_group':
1139 if changing_attr == 'repo_group':
1140 fixture.destroy_repo_group(updates['group'])
1140 fixture.destroy_repo_group(updates['group'])
1141
1141
1142 def test_api_update_repo_repo_group_does_not_exist(self):
1142 def test_api_update_repo_repo_group_does_not_exist(self):
1143 repo_name = 'admin_owned'
1143 repo_name = 'admin_owned'
1144 fixture.create_repo(repo_name)
1144 fixture.create_repo(repo_name)
1145 updates = {'group': 'test_group_for_update'}
1145 updates = {'group': 'test_group_for_update'}
1146 id_, params = _build_data(self.apikey, 'update_repo',
1146 id_, params = _build_data(self.apikey, 'update_repo',
1147 repoid=repo_name, **updates)
1147 repoid=repo_name, **updates)
1148 response = api_call(self, params)
1148 response = api_call(self, params)
1149 try:
1149 try:
1150 expected = 'repository group `%s` does not exist' % updates['group']
1150 expected = 'repository group `%s` does not exist' % updates['group']
1151 self._compare_error(id_, expected, given=response.body)
1151 self._compare_error(id_, expected, given=response.body)
1152 finally:
1152 finally:
1153 fixture.destroy_repo(repo_name)
1153 fixture.destroy_repo(repo_name)
1154
1154
1155 def test_api_update_repo_regular_user_not_allowed(self):
1155 def test_api_update_repo_regular_user_not_allowed(self):
1156 repo_name = 'admin_owned'
1156 repo_name = 'admin_owned'
1157 fixture.create_repo(repo_name)
1157 fixture.create_repo(repo_name)
1158 updates = {'active': False}
1158 updates = {'active': False}
1159 id_, params = _build_data(self.apikey_regular, 'update_repo',
1159 id_, params = _build_data(self.apikey_regular, 'update_repo',
1160 repoid=repo_name, **updates)
1160 repoid=repo_name, **updates)
1161 response = api_call(self, params)
1161 response = api_call(self, params)
1162 try:
1162 try:
1163 expected = 'repository `%s` does not exist' % repo_name
1163 expected = 'repository `%s` does not exist' % repo_name
1164 self._compare_error(id_, expected, given=response.body)
1164 self._compare_error(id_, expected, given=response.body)
1165 finally:
1165 finally:
1166 fixture.destroy_repo(repo_name)
1166 fixture.destroy_repo(repo_name)
1167
1167
1168 @mock.patch.object(RepoModel, 'update', crash)
1168 @mock.patch.object(RepoModel, 'update', crash)
1169 def test_api_update_repo_exception_occured(self):
1169 def test_api_update_repo_exception_occured(self):
1170 repo_name = 'api_update_me'
1170 repo_name = 'api_update_me'
1171 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1171 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1172 id_, params = _build_data(self.apikey, 'update_repo',
1172 id_, params = _build_data(self.apikey, 'update_repo',
1173 repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
1173 repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
1174 response = api_call(self, params)
1174 response = api_call(self, params)
1175 try:
1175 try:
1176 expected = 'failed to update repo `%s`' % repo_name
1176 expected = 'failed to update repo `%s`' % repo_name
1177 self._compare_error(id_, expected, given=response.body)
1177 self._compare_error(id_, expected, given=response.body)
1178 finally:
1178 finally:
1179 fixture.destroy_repo(repo_name)
1179 fixture.destroy_repo(repo_name)
1180
1180
1181 def test_api_delete_repo(self):
1181 def test_api_delete_repo(self):
1182 repo_name = 'api_delete_me'
1182 repo_name = 'api_delete_me'
1183 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1183 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1184
1184
1185 id_, params = _build_data(self.apikey, 'delete_repo',
1185 id_, params = _build_data(self.apikey, 'delete_repo',
1186 repoid=repo_name, )
1186 repoid=repo_name, )
1187 response = api_call(self, params)
1187 response = api_call(self, params)
1188
1188
1189 ret = {
1189 ret = {
1190 'msg': 'Deleted repository `%s`' % repo_name,
1190 'msg': 'Deleted repository `%s`' % repo_name,
1191 'success': True
1191 'success': True
1192 }
1192 }
1193 try:
1193 try:
1194 expected = ret
1194 expected = ret
1195 self._compare_ok(id_, expected, given=response.body)
1195 self._compare_ok(id_, expected, given=response.body)
1196 finally:
1196 finally:
1197 fixture.destroy_repo(repo_name)
1197 fixture.destroy_repo(repo_name)
1198
1198
1199 def test_api_delete_repo_by_non_admin(self):
1199 def test_api_delete_repo_by_non_admin(self):
1200 repo_name = 'api_delete_me'
1200 repo_name = 'api_delete_me'
1201 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
1201 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
1202 cur_user=self.TEST_USER_LOGIN)
1202 cur_user=self.TEST_USER_LOGIN)
1203 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1203 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1204 repoid=repo_name, )
1204 repoid=repo_name, )
1205 response = api_call(self, params)
1205 response = api_call(self, params)
1206
1206
1207 ret = {
1207 ret = {
1208 'msg': 'Deleted repository `%s`' % repo_name,
1208 'msg': 'Deleted repository `%s`' % repo_name,
1209 'success': True
1209 'success': True
1210 }
1210 }
1211 try:
1211 try:
1212 expected = ret
1212 expected = ret
1213 self._compare_ok(id_, expected, given=response.body)
1213 self._compare_ok(id_, expected, given=response.body)
1214 finally:
1214 finally:
1215 fixture.destroy_repo(repo_name)
1215 fixture.destroy_repo(repo_name)
1216
1216
1217 def test_api_delete_repo_by_non_admin_no_permission(self):
1217 def test_api_delete_repo_by_non_admin_no_permission(self):
1218 repo_name = 'api_delete_me'
1218 repo_name = 'api_delete_me'
1219 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1219 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1220 try:
1220 try:
1221 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1221 id_, params = _build_data(self.apikey_regular, 'delete_repo',
1222 repoid=repo_name, )
1222 repoid=repo_name, )
1223 response = api_call(self, params)
1223 response = api_call(self, params)
1224 expected = 'repository `%s` does not exist' % (repo_name)
1224 expected = 'repository `%s` does not exist' % (repo_name)
1225 self._compare_error(id_, expected, given=response.body)
1225 self._compare_error(id_, expected, given=response.body)
1226 finally:
1226 finally:
1227 fixture.destroy_repo(repo_name)
1227 fixture.destroy_repo(repo_name)
1228
1228
1229 def test_api_delete_repo_exception_occurred(self):
1229 def test_api_delete_repo_exception_occurred(self):
1230 repo_name = 'api_delete_me'
1230 repo_name = 'api_delete_me'
1231 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1231 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
1232 try:
1232 try:
1233 with mock.patch.object(RepoModel, 'delete', crash):
1233 with mock.patch.object(RepoModel, 'delete', crash):
1234 id_, params = _build_data(self.apikey, 'delete_repo',
1234 id_, params = _build_data(self.apikey, 'delete_repo',
1235 repoid=repo_name, )
1235 repoid=repo_name, )
1236 response = api_call(self, params)
1236 response = api_call(self, params)
1237
1237
1238 expected = 'failed to delete repository `%s`' % repo_name
1238 expected = 'failed to delete repository `%s`' % repo_name
1239 self._compare_error(id_, expected, given=response.body)
1239 self._compare_error(id_, expected, given=response.body)
1240 finally:
1240 finally:
1241 fixture.destroy_repo(repo_name)
1241 fixture.destroy_repo(repo_name)
1242
1242
1243 def test_api_fork_repo(self):
1243 def test_api_fork_repo(self):
1244 fork_name = 'api-repo-fork'
1244 fork_name = 'api-repo-fork'
1245 id_, params = _build_data(self.apikey, 'fork_repo',
1245 id_, params = _build_data(self.apikey, 'fork_repo',
1246 repoid=self.REPO,
1246 repoid=self.REPO,
1247 fork_name=fork_name,
1247 fork_name=fork_name,
1248 owner=TEST_USER_ADMIN_LOGIN,
1248 owner=TEST_USER_ADMIN_LOGIN,
1249 )
1249 )
1250 response = api_call(self, params)
1250 response = api_call(self, params)
1251
1251
1252 ret = {
1252 ret = {
1253 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1253 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1254 fork_name),
1254 fork_name),
1255 'success': True,
1255 'success': True,
1256 'task': None,
1256 'task': None,
1257 }
1257 }
1258 expected = ret
1258 expected = ret
1259 self._compare_ok(id_, expected, given=response.body)
1259 self._compare_ok(id_, expected, given=response.body)
1260 fixture.destroy_repo(fork_name)
1260 fixture.destroy_repo(fork_name)
1261
1261
1262 def test_api_fork_repo_non_admin(self):
1262 def test_api_fork_repo_non_admin(self):
1263 fork_name = 'api-repo-fork'
1263 fork_name = 'api-repo-fork'
1264 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1264 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1265 repoid=self.REPO,
1265 repoid=self.REPO,
1266 fork_name=fork_name,
1266 fork_name=fork_name,
1267 )
1267 )
1268 response = api_call(self, params)
1268 response = api_call(self, params)
1269
1269
1270 ret = {
1270 ret = {
1271 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1271 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
1272 fork_name),
1272 fork_name),
1273 'success': True,
1273 'success': True,
1274 'task': None,
1274 'task': None,
1275 }
1275 }
1276 expected = ret
1276 expected = ret
1277 self._compare_ok(id_, expected, given=response.body)
1277 self._compare_ok(id_, expected, given=response.body)
1278 fixture.destroy_repo(fork_name)
1278 fixture.destroy_repo(fork_name)
1279
1279
1280 def test_api_fork_repo_non_admin_specify_owner(self):
1280 def test_api_fork_repo_non_admin_specify_owner(self):
1281 fork_name = 'api-repo-fork'
1281 fork_name = 'api-repo-fork'
1282 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1282 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1283 repoid=self.REPO,
1283 repoid=self.REPO,
1284 fork_name=fork_name,
1284 fork_name=fork_name,
1285 owner=TEST_USER_ADMIN_LOGIN,
1285 owner=TEST_USER_ADMIN_LOGIN,
1286 )
1286 )
1287 response = api_call(self, params)
1287 response = api_call(self, params)
1288 expected = 'Only Kallithea admin can specify `owner` param'
1288 expected = 'Only Kallithea admin can specify `owner` param'
1289 self._compare_error(id_, expected, given=response.body)
1289 self._compare_error(id_, expected, given=response.body)
1290 fixture.destroy_repo(fork_name)
1290 fixture.destroy_repo(fork_name)
1291
1291
1292 def test_api_fork_repo_non_admin_no_permission_to_fork(self):
1292 def test_api_fork_repo_non_admin_no_permission_to_fork(self):
1293 RepoModel().grant_user_permission(repo=self.REPO,
1293 RepoModel().grant_user_permission(repo=self.REPO,
1294 user=self.TEST_USER_LOGIN,
1294 user=self.TEST_USER_LOGIN,
1295 perm='repository.none')
1295 perm='repository.none')
1296 fork_name = 'api-repo-fork'
1296 fork_name = 'api-repo-fork'
1297 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1297 id_, params = _build_data(self.apikey_regular, 'fork_repo',
1298 repoid=self.REPO,
1298 repoid=self.REPO,
1299 fork_name=fork_name,
1299 fork_name=fork_name,
1300 )
1300 )
1301 response = api_call(self, params)
1301 response = api_call(self, params)
1302 expected = 'repository `%s` does not exist' % (self.REPO)
1302 expected = 'repository `%s` does not exist' % (self.REPO)
1303 self._compare_error(id_, expected, given=response.body)
1303 self._compare_error(id_, expected, given=response.body)
1304 fixture.destroy_repo(fork_name)
1304 fixture.destroy_repo(fork_name)
1305
1305
1306 def test_api_fork_repo_unknown_owner(self):
1306 def test_api_fork_repo_unknown_owner(self):
1307 fork_name = 'api-repo-fork'
1307 fork_name = 'api-repo-fork'
1308 owner = 'i-dont-exist'
1308 owner = 'i-dont-exist'
1309 id_, params = _build_data(self.apikey, 'fork_repo',
1309 id_, params = _build_data(self.apikey, 'fork_repo',
1310 repoid=self.REPO,
1310 repoid=self.REPO,
1311 fork_name=fork_name,
1311 fork_name=fork_name,
1312 owner=owner,
1312 owner=owner,
1313 )
1313 )
1314 response = api_call(self, params)
1314 response = api_call(self, params)
1315 expected = 'user `%s` does not exist' % owner
1315 expected = 'user `%s` does not exist' % owner
1316 self._compare_error(id_, expected, given=response.body)
1316 self._compare_error(id_, expected, given=response.body)
1317
1317
1318 def test_api_fork_repo_fork_exists(self):
1318 def test_api_fork_repo_fork_exists(self):
1319 fork_name = 'api-repo-fork'
1319 fork_name = 'api-repo-fork'
1320 fixture.create_fork(self.REPO, fork_name)
1320 fixture.create_fork(self.REPO, fork_name)
1321
1321
1322 try:
1322 try:
1323 fork_name = 'api-repo-fork'
1323 fork_name = 'api-repo-fork'
1324
1324
1325 id_, params = _build_data(self.apikey, 'fork_repo',
1325 id_, params = _build_data(self.apikey, 'fork_repo',
1326 repoid=self.REPO,
1326 repoid=self.REPO,
1327 fork_name=fork_name,
1327 fork_name=fork_name,
1328 owner=TEST_USER_ADMIN_LOGIN,
1328 owner=TEST_USER_ADMIN_LOGIN,
1329 )
1329 )
1330 response = api_call(self, params)
1330 response = api_call(self, params)
1331
1331
1332 expected = "fork `%s` already exist" % fork_name
1332 expected = "fork `%s` already exist" % fork_name
1333 self._compare_error(id_, expected, given=response.body)
1333 self._compare_error(id_, expected, given=response.body)
1334 finally:
1334 finally:
1335 fixture.destroy_repo(fork_name)
1335 fixture.destroy_repo(fork_name)
1336
1336
1337 def test_api_fork_repo_repo_exists(self):
1337 def test_api_fork_repo_repo_exists(self):
1338 fork_name = self.REPO
1338 fork_name = self.REPO
1339
1339
1340 id_, params = _build_data(self.apikey, 'fork_repo',
1340 id_, params = _build_data(self.apikey, 'fork_repo',
1341 repoid=self.REPO,
1341 repoid=self.REPO,
1342 fork_name=fork_name,
1342 fork_name=fork_name,
1343 owner=TEST_USER_ADMIN_LOGIN,
1343 owner=TEST_USER_ADMIN_LOGIN,
1344 )
1344 )
1345 response = api_call(self, params)
1345 response = api_call(self, params)
1346
1346
1347 expected = "repo `%s` already exist" % fork_name
1347 expected = "repo `%s` already exist" % fork_name
1348 self._compare_error(id_, expected, given=response.body)
1348 self._compare_error(id_, expected, given=response.body)
1349
1349
1350 @mock.patch.object(RepoModel, 'create_fork', crash)
1350 @mock.patch.object(RepoModel, 'create_fork', crash)
1351 def test_api_fork_repo_exception_occurred(self):
1351 def test_api_fork_repo_exception_occurred(self):
1352 fork_name = 'api-repo-fork'
1352 fork_name = 'api-repo-fork'
1353 id_, params = _build_data(self.apikey, 'fork_repo',
1353 id_, params = _build_data(self.apikey, 'fork_repo',
1354 repoid=self.REPO,
1354 repoid=self.REPO,
1355 fork_name=fork_name,
1355 fork_name=fork_name,
1356 owner=TEST_USER_ADMIN_LOGIN,
1356 owner=TEST_USER_ADMIN_LOGIN,
1357 )
1357 )
1358 response = api_call(self, params)
1358 response = api_call(self, params)
1359
1359
1360 expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
1360 expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
1361 fork_name)
1361 fork_name)
1362 self._compare_error(id_, expected, given=response.body)
1362 self._compare_error(id_, expected, given=response.body)
1363
1363
1364 def test_api_get_user_group(self):
1364 def test_api_get_user_group(self):
1365 id_, params = _build_data(self.apikey, 'get_user_group',
1365 id_, params = _build_data(self.apikey, 'get_user_group',
1366 usergroupid=TEST_USER_GROUP)
1366 usergroupid=TEST_USER_GROUP)
1367 response = api_call(self, params)
1367 response = api_call(self, params)
1368
1368
1369 user_group = UserGroupModel().get_group(TEST_USER_GROUP)
1369 user_group = UserGroupModel().get_group(TEST_USER_GROUP)
1370 members = []
1370 members = []
1371 for user in user_group.members:
1371 for user in user_group.members:
1372 user = user.user
1372 user = user.user
1373 members.append(user.get_api_data())
1373 members.append(user.get_api_data())
1374
1374
1375 ret = user_group.get_api_data()
1375 ret = user_group.get_api_data()
1376 ret['members'] = members
1376 ret['members'] = members
1377 expected = ret
1377 expected = ret
1378 self._compare_ok(id_, expected, given=response.body)
1378 self._compare_ok(id_, expected, given=response.body)
1379
1379
1380 def test_api_get_user_groups(self):
1380 def test_api_get_user_groups(self):
1381 gr_name = 'test_user_group2'
1381 gr_name = 'test_user_group2'
1382 make_user_group(gr_name)
1382 make_user_group(gr_name)
1383
1383
1384 id_, params = _build_data(self.apikey, 'get_user_groups', )
1384 id_, params = _build_data(self.apikey, 'get_user_groups', )
1385 response = api_call(self, params)
1385 response = api_call(self, params)
1386
1386
1387 try:
1387 try:
1388 expected = []
1388 expected = []
1389 for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
1389 for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
1390 user_group = UserGroupModel().get_group(gr_name)
1390 user_group = UserGroupModel().get_group(gr_name)
1391 ret = user_group.get_api_data()
1391 ret = user_group.get_api_data()
1392 expected.append(ret)
1392 expected.append(ret)
1393 self._compare_ok(id_, expected, given=response.body)
1393 self._compare_ok(id_, expected, given=response.body)
1394 finally:
1394 finally:
1395 fixture.destroy_user_group(gr_name)
1395 fixture.destroy_user_group(gr_name)
1396
1396
1397 def test_api_create_user_group(self):
1397 def test_api_create_user_group(self):
1398 group_name = 'some_new_group'
1398 group_name = 'some_new_group'
1399 id_, params = _build_data(self.apikey, 'create_user_group',
1399 id_, params = _build_data(self.apikey, 'create_user_group',
1400 group_name=group_name)
1400 group_name=group_name)
1401 response = api_call(self, params)
1401 response = api_call(self, params)
1402
1402
1403 ret = {
1403 ret = {
1404 'msg': 'created new user group `%s`' % group_name,
1404 'msg': 'created new user group `%s`' % group_name,
1405 'user_group': jsonify(UserGroupModel() \
1405 'user_group': jsonify(UserGroupModel() \
1406 .get_by_name(group_name) \
1406 .get_by_name(group_name) \
1407 .get_api_data())
1407 .get_api_data())
1408 }
1408 }
1409 expected = ret
1409 expected = ret
1410 self._compare_ok(id_, expected, given=response.body)
1410 self._compare_ok(id_, expected, given=response.body)
1411
1411
1412 fixture.destroy_user_group(group_name)
1412 fixture.destroy_user_group(group_name)
1413
1413
1414 def test_api_get_user_group_that_exist(self):
1414 def test_api_get_user_group_that_exist(self):
1415 id_, params = _build_data(self.apikey, 'create_user_group',
1415 id_, params = _build_data(self.apikey, 'create_user_group',
1416 group_name=TEST_USER_GROUP)
1416 group_name=TEST_USER_GROUP)
1417 response = api_call(self, params)
1417 response = api_call(self, params)
1418
1418
1419 expected = "user group `%s` already exist" % TEST_USER_GROUP
1419 expected = "user group `%s` already exist" % TEST_USER_GROUP
1420 self._compare_error(id_, expected, given=response.body)
1420 self._compare_error(id_, expected, given=response.body)
1421
1421
1422 @mock.patch.object(UserGroupModel, 'create', crash)
1422 @mock.patch.object(UserGroupModel, 'create', crash)
1423 def test_api_get_user_group_exception_occurred(self):
1423 def test_api_get_user_group_exception_occurred(self):
1424 group_name = 'exception_happens'
1424 group_name = 'exception_happens'
1425 id_, params = _build_data(self.apikey, 'create_user_group',
1425 id_, params = _build_data(self.apikey, 'create_user_group',
1426 group_name=group_name)
1426 group_name=group_name)
1427 response = api_call(self, params)
1427 response = api_call(self, params)
1428
1428
1429 expected = 'failed to create group `%s`' % group_name
1429 expected = 'failed to create group `%s`' % group_name
1430 self._compare_error(id_, expected, given=response.body)
1430 self._compare_error(id_, expected, given=response.body)
1431
1431
1432 @parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
1432 @parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
1433 ('group_name', {'group_name': 'test_group_for_update'}),
1433 ('group_name', {'group_name': 'test_group_for_update'}),
1434 ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
1434 ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
1435 ('active', {'active': False}),
1435 ('active', {'active': False}),
1436 ('active', {'active': True})])
1436 ('active', {'active': True})])
1437 def test_api_update_user_group(self, changing_attr, updates):
1437 def test_api_update_user_group(self, changing_attr, updates):
1438 gr_name = 'test_group_for_update'
1438 gr_name = 'test_group_for_update'
1439 user_group = fixture.create_user_group(gr_name)
1439 user_group = fixture.create_user_group(gr_name)
1440 id_, params = _build_data(self.apikey, 'update_user_group',
1440 id_, params = _build_data(self.apikey, 'update_user_group',
1441 usergroupid=gr_name, **updates)
1441 usergroupid=gr_name, **updates)
1442 response = api_call(self, params)
1442 response = api_call(self, params)
1443 try:
1443 try:
1444 expected = {
1444 expected = {
1445 'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
1445 'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
1446 user_group.users_group_name),
1446 user_group.users_group_name),
1447 'user_group': user_group.get_api_data()
1447 'user_group': user_group.get_api_data()
1448 }
1448 }
1449 self._compare_ok(id_, expected, given=response.body)
1449 self._compare_ok(id_, expected, given=response.body)
1450 finally:
1450 finally:
1451 if changing_attr == 'group_name':
1451 if changing_attr == 'group_name':
1452 # switch to updated name for proper cleanup
1452 # switch to updated name for proper cleanup
1453 gr_name = updates['group_name']
1453 gr_name = updates['group_name']
1454 fixture.destroy_user_group(gr_name)
1454 fixture.destroy_user_group(gr_name)
1455
1455
1456 @mock.patch.object(UserGroupModel, 'update', crash)
1456 @mock.patch.object(UserGroupModel, 'update', crash)
1457 def test_api_update_user_group_exception_occured(self):
1457 def test_api_update_user_group_exception_occured(self):
1458 gr_name = 'test_group'
1458 gr_name = 'test_group'
1459 fixture.create_user_group(gr_name)
1459 fixture.create_user_group(gr_name)
1460 id_, params = _build_data(self.apikey, 'update_user_group',
1460 id_, params = _build_data(self.apikey, 'update_user_group',
1461 usergroupid=gr_name)
1461 usergroupid=gr_name)
1462 response = api_call(self, params)
1462 response = api_call(self, params)
1463 try:
1463 try:
1464 expected = 'failed to update user group `%s`' % gr_name
1464 expected = 'failed to update user group `%s`' % gr_name
1465 self._compare_error(id_, expected, given=response.body)
1465 self._compare_error(id_, expected, given=response.body)
1466 finally:
1466 finally:
1467 fixture.destroy_user_group(gr_name)
1467 fixture.destroy_user_group(gr_name)
1468
1468
1469 def test_api_add_user_to_user_group(self):
1469 def test_api_add_user_to_user_group(self):
1470 gr_name = 'test_group'
1470 gr_name = 'test_group'
1471 fixture.create_user_group(gr_name)
1471 fixture.create_user_group(gr_name)
1472 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1472 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1473 usergroupid=gr_name,
1473 usergroupid=gr_name,
1474 userid=TEST_USER_ADMIN_LOGIN)
1474 userid=TEST_USER_ADMIN_LOGIN)
1475 response = api_call(self, params)
1475 response = api_call(self, params)
1476 try:
1476 try:
1477 expected = {
1477 expected = {
1478 'msg': 'added member `%s` to user group `%s`' % (
1478 'msg': 'added member `%s` to user group `%s`' % (
1479 TEST_USER_ADMIN_LOGIN, gr_name),
1479 TEST_USER_ADMIN_LOGIN, gr_name),
1480 'success': True
1480 'success': True
1481 }
1481 }
1482 self._compare_ok(id_, expected, given=response.body)
1482 self._compare_ok(id_, expected, given=response.body)
1483 finally:
1483 finally:
1484 fixture.destroy_user_group(gr_name)
1484 fixture.destroy_user_group(gr_name)
1485
1485
1486 def test_api_add_user_to_user_group_that_doesnt_exist(self):
1486 def test_api_add_user_to_user_group_that_doesnt_exist(self):
1487 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1487 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1488 usergroupid='false-group',
1488 usergroupid='false-group',
1489 userid=TEST_USER_ADMIN_LOGIN)
1489 userid=TEST_USER_ADMIN_LOGIN)
1490 response = api_call(self, params)
1490 response = api_call(self, params)
1491
1491
1492 expected = 'user group `%s` does not exist' % 'false-group'
1492 expected = 'user group `%s` does not exist' % 'false-group'
1493 self._compare_error(id_, expected, given=response.body)
1493 self._compare_error(id_, expected, given=response.body)
1494
1494
1495 @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
1495 @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
1496 def test_api_add_user_to_user_group_exception_occurred(self):
1496 def test_api_add_user_to_user_group_exception_occurred(self):
1497 gr_name = 'test_group'
1497 gr_name = 'test_group'
1498 fixture.create_user_group(gr_name)
1498 fixture.create_user_group(gr_name)
1499 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1499 id_, params = _build_data(self.apikey, 'add_user_to_user_group',
1500 usergroupid=gr_name,
1500 usergroupid=gr_name,
1501 userid=TEST_USER_ADMIN_LOGIN)
1501 userid=TEST_USER_ADMIN_LOGIN)
1502 response = api_call(self, params)
1502 response = api_call(self, params)
1503
1503
1504 try:
1504 try:
1505 expected = 'failed to add member to user group `%s`' % gr_name
1505 expected = 'failed to add member to user group `%s`' % gr_name
1506 self._compare_error(id_, expected, given=response.body)
1506 self._compare_error(id_, expected, given=response.body)
1507 finally:
1507 finally:
1508 fixture.destroy_user_group(gr_name)
1508 fixture.destroy_user_group(gr_name)
1509
1509
1510 def test_api_remove_user_from_user_group(self):
1510 def test_api_remove_user_from_user_group(self):
1511 gr_name = 'test_group_3'
1511 gr_name = 'test_group_3'
1512 gr = fixture.create_user_group(gr_name)
1512 gr = fixture.create_user_group(gr_name)
1513 UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
1513 UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
1514 Session().commit()
1514 Session().commit()
1515 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1515 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1516 usergroupid=gr_name,
1516 usergroupid=gr_name,
1517 userid=TEST_USER_ADMIN_LOGIN)
1517 userid=TEST_USER_ADMIN_LOGIN)
1518 response = api_call(self, params)
1518 response = api_call(self, params)
1519
1519
1520 try:
1520 try:
1521 expected = {
1521 expected = {
1522 'msg': 'removed member `%s` from user group `%s`' % (
1522 'msg': 'removed member `%s` from user group `%s`' % (
1523 TEST_USER_ADMIN_LOGIN, gr_name
1523 TEST_USER_ADMIN_LOGIN, gr_name
1524 ),
1524 ),
1525 'success': True}
1525 'success': True}
1526 self._compare_ok(id_, expected, given=response.body)
1526 self._compare_ok(id_, expected, given=response.body)
1527 finally:
1527 finally:
1528 fixture.destroy_user_group(gr_name)
1528 fixture.destroy_user_group(gr_name)
1529
1529
1530 @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
1530 @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
1531 def test_api_remove_user_from_user_group_exception_occurred(self):
1531 def test_api_remove_user_from_user_group_exception_occurred(self):
1532 gr_name = 'test_group_3'
1532 gr_name = 'test_group_3'
1533 gr = fixture.create_user_group(gr_name)
1533 gr = fixture.create_user_group(gr_name)
1534 UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
1534 UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
1535 Session().commit()
1535 Session().commit()
1536 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1536 id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
1537 usergroupid=gr_name,
1537 usergroupid=gr_name,
1538 userid=TEST_USER_ADMIN_LOGIN)
1538 userid=TEST_USER_ADMIN_LOGIN)
1539 response = api_call(self, params)
1539 response = api_call(self, params)
1540 try:
1540 try:
1541 expected = 'failed to remove member from user group `%s`' % gr_name
1541 expected = 'failed to remove member from user group `%s`' % gr_name
1542 self._compare_error(id_, expected, given=response.body)
1542 self._compare_error(id_, expected, given=response.body)
1543 finally:
1543 finally:
1544 fixture.destroy_user_group(gr_name)
1544 fixture.destroy_user_group(gr_name)
1545
1545
1546 def test_api_delete_user_group(self):
1546 def test_api_delete_user_group(self):
1547 gr_name = 'test_group'
1547 gr_name = 'test_group'
1548 ugroup = fixture.create_user_group(gr_name)
1548 ugroup = fixture.create_user_group(gr_name)
1549 gr_id = ugroup.users_group_id
1549 gr_id = ugroup.users_group_id
1550 id_, params = _build_data(self.apikey, 'delete_user_group',
1550 id_, params = _build_data(self.apikey, 'delete_user_group',
1551 usergroupid=gr_name,
1551 usergroupid=gr_name,
1552 userid=TEST_USER_ADMIN_LOGIN)
1552 userid=TEST_USER_ADMIN_LOGIN)
1553 response = api_call(self, params)
1553 response = api_call(self, params)
1554
1554
1555 try:
1555 try:
1556 expected = {
1556 expected = {
1557 'user_group': None,
1557 'user_group': None,
1558 'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
1558 'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
1559 }
1559 }
1560 self._compare_ok(id_, expected, given=response.body)
1560 self._compare_ok(id_, expected, given=response.body)
1561 finally:
1561 finally:
1562 if UserGroupModel().get_by_name(gr_name):
1562 if UserGroupModel().get_by_name(gr_name):
1563 fixture.destroy_user_group(gr_name)
1563 fixture.destroy_user_group(gr_name)
1564
1564
1565 def test_api_delete_user_group_that_is_assigned(self):
1565 def test_api_delete_user_group_that_is_assigned(self):
1566 gr_name = 'test_group'
1566 gr_name = 'test_group'
1567 ugroup = fixture.create_user_group(gr_name)
1567 ugroup = fixture.create_user_group(gr_name)
1568 gr_id = ugroup.users_group_id
1568 gr_id = ugroup.users_group_id
1569
1569
1570 ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
1570 ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
1571 msg = 'RepoGroup assigned to [%s]' % (ugr_to_perm)
1571 msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
1572
1572
1573 id_, params = _build_data(self.apikey, 'delete_user_group',
1573 id_, params = _build_data(self.apikey, 'delete_user_group',
1574 usergroupid=gr_name,
1574 usergroupid=gr_name,
1575 userid=TEST_USER_ADMIN_LOGIN)
1575 userid=TEST_USER_ADMIN_LOGIN)
1576 response = api_call(self, params)
1576 response = api_call(self, params)
1577
1577
1578 try:
1578 try:
1579 expected = msg
1579 expected = msg
1580 self._compare_error(id_, expected, given=response.body)
1580 self._compare_error(id_, expected, given=response.body)
1581 finally:
1581 finally:
1582 if UserGroupModel().get_by_name(gr_name):
1582 if UserGroupModel().get_by_name(gr_name):
1583 fixture.destroy_user_group(gr_name)
1583 fixture.destroy_user_group(gr_name)
1584
1584
1585 def test_api_delete_user_group_exception_occured(self):
1585 def test_api_delete_user_group_exception_occured(self):
1586 gr_name = 'test_group'
1586 gr_name = 'test_group'
1587 ugroup = fixture.create_user_group(gr_name)
1587 ugroup = fixture.create_user_group(gr_name)
1588 gr_id = ugroup.users_group_id
1588 gr_id = ugroup.users_group_id
1589 id_, params = _build_data(self.apikey, 'delete_user_group',
1589 id_, params = _build_data(self.apikey, 'delete_user_group',
1590 usergroupid=gr_name,
1590 usergroupid=gr_name,
1591 userid=TEST_USER_ADMIN_LOGIN)
1591 userid=TEST_USER_ADMIN_LOGIN)
1592
1592
1593 try:
1593 try:
1594 with mock.patch.object(UserGroupModel, 'delete', crash):
1594 with mock.patch.object(UserGroupModel, 'delete', crash):
1595 response = api_call(self, params)
1595 response = api_call(self, params)
1596 expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
1596 expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
1597 self._compare_error(id_, expected, given=response.body)
1597 self._compare_error(id_, expected, given=response.body)
1598 finally:
1598 finally:
1599 fixture.destroy_user_group(gr_name)
1599 fixture.destroy_user_group(gr_name)
1600
1600
1601 @parameterized.expand([('none', 'repository.none'),
1601 @parameterized.expand([('none', 'repository.none'),
1602 ('read', 'repository.read'),
1602 ('read', 'repository.read'),
1603 ('write', 'repository.write'),
1603 ('write', 'repository.write'),
1604 ('admin', 'repository.admin')])
1604 ('admin', 'repository.admin')])
1605 def test_api_grant_user_permission(self, name, perm):
1605 def test_api_grant_user_permission(self, name, perm):
1606 id_, params = _build_data(self.apikey,
1606 id_, params = _build_data(self.apikey,
1607 'grant_user_permission',
1607 'grant_user_permission',
1608 repoid=self.REPO,
1608 repoid=self.REPO,
1609 userid=TEST_USER_ADMIN_LOGIN,
1609 userid=TEST_USER_ADMIN_LOGIN,
1610 perm=perm)
1610 perm=perm)
1611 response = api_call(self, params)
1611 response = api_call(self, params)
1612
1612
1613 ret = {
1613 ret = {
1614 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
1614 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
1615 perm, TEST_USER_ADMIN_LOGIN, self.REPO
1615 perm, TEST_USER_ADMIN_LOGIN, self.REPO
1616 ),
1616 ),
1617 'success': True
1617 'success': True
1618 }
1618 }
1619 expected = ret
1619 expected = ret
1620 self._compare_ok(id_, expected, given=response.body)
1620 self._compare_ok(id_, expected, given=response.body)
1621
1621
1622 def test_api_grant_user_permission_wrong_permission(self):
1622 def test_api_grant_user_permission_wrong_permission(self):
1623 perm = 'haha.no.permission'
1623 perm = 'haha.no.permission'
1624 id_, params = _build_data(self.apikey,
1624 id_, params = _build_data(self.apikey,
1625 'grant_user_permission',
1625 'grant_user_permission',
1626 repoid=self.REPO,
1626 repoid=self.REPO,
1627 userid=TEST_USER_ADMIN_LOGIN,
1627 userid=TEST_USER_ADMIN_LOGIN,
1628 perm=perm)
1628 perm=perm)
1629 response = api_call(self, params)
1629 response = api_call(self, params)
1630
1630
1631 expected = 'permission `%s` does not exist' % perm
1631 expected = 'permission `%s` does not exist' % perm
1632 self._compare_error(id_, expected, given=response.body)
1632 self._compare_error(id_, expected, given=response.body)
1633
1633
1634 @mock.patch.object(RepoModel, 'grant_user_permission', crash)
1634 @mock.patch.object(RepoModel, 'grant_user_permission', crash)
1635 def test_api_grant_user_permission_exception_when_adding(self):
1635 def test_api_grant_user_permission_exception_when_adding(self):
1636 perm = 'repository.read'
1636 perm = 'repository.read'
1637 id_, params = _build_data(self.apikey,
1637 id_, params = _build_data(self.apikey,
1638 'grant_user_permission',
1638 'grant_user_permission',
1639 repoid=self.REPO,
1639 repoid=self.REPO,
1640 userid=TEST_USER_ADMIN_LOGIN,
1640 userid=TEST_USER_ADMIN_LOGIN,
1641 perm=perm)
1641 perm=perm)
1642 response = api_call(self, params)
1642 response = api_call(self, params)
1643
1643
1644 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1644 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1645 TEST_USER_ADMIN_LOGIN, self.REPO
1645 TEST_USER_ADMIN_LOGIN, self.REPO
1646 )
1646 )
1647 self._compare_error(id_, expected, given=response.body)
1647 self._compare_error(id_, expected, given=response.body)
1648
1648
1649 def test_api_revoke_user_permission(self):
1649 def test_api_revoke_user_permission(self):
1650 id_, params = _build_data(self.apikey,
1650 id_, params = _build_data(self.apikey,
1651 'revoke_user_permission',
1651 'revoke_user_permission',
1652 repoid=self.REPO,
1652 repoid=self.REPO,
1653 userid=TEST_USER_ADMIN_LOGIN, )
1653 userid=TEST_USER_ADMIN_LOGIN, )
1654 response = api_call(self, params)
1654 response = api_call(self, params)
1655
1655
1656 expected = {
1656 expected = {
1657 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
1657 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
1658 TEST_USER_ADMIN_LOGIN, self.REPO
1658 TEST_USER_ADMIN_LOGIN, self.REPO
1659 ),
1659 ),
1660 'success': True
1660 'success': True
1661 }
1661 }
1662 self._compare_ok(id_, expected, given=response.body)
1662 self._compare_ok(id_, expected, given=response.body)
1663
1663
1664 @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
1664 @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
1665 def test_api_revoke_user_permission_exception_when_adding(self):
1665 def test_api_revoke_user_permission_exception_when_adding(self):
1666 id_, params = _build_data(self.apikey,
1666 id_, params = _build_data(self.apikey,
1667 'revoke_user_permission',
1667 'revoke_user_permission',
1668 repoid=self.REPO,
1668 repoid=self.REPO,
1669 userid=TEST_USER_ADMIN_LOGIN, )
1669 userid=TEST_USER_ADMIN_LOGIN, )
1670 response = api_call(self, params)
1670 response = api_call(self, params)
1671
1671
1672 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1672 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1673 TEST_USER_ADMIN_LOGIN, self.REPO
1673 TEST_USER_ADMIN_LOGIN, self.REPO
1674 )
1674 )
1675 self._compare_error(id_, expected, given=response.body)
1675 self._compare_error(id_, expected, given=response.body)
1676
1676
1677 @parameterized.expand([('none', 'repository.none'),
1677 @parameterized.expand([('none', 'repository.none'),
1678 ('read', 'repository.read'),
1678 ('read', 'repository.read'),
1679 ('write', 'repository.write'),
1679 ('write', 'repository.write'),
1680 ('admin', 'repository.admin')])
1680 ('admin', 'repository.admin')])
1681 def test_api_grant_user_group_permission(self, name, perm):
1681 def test_api_grant_user_group_permission(self, name, perm):
1682 id_, params = _build_data(self.apikey,
1682 id_, params = _build_data(self.apikey,
1683 'grant_user_group_permission',
1683 'grant_user_group_permission',
1684 repoid=self.REPO,
1684 repoid=self.REPO,
1685 usergroupid=TEST_USER_GROUP,
1685 usergroupid=TEST_USER_GROUP,
1686 perm=perm)
1686 perm=perm)
1687 response = api_call(self, params)
1687 response = api_call(self, params)
1688
1688
1689 ret = {
1689 ret = {
1690 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
1690 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
1691 perm, TEST_USER_GROUP, self.REPO
1691 perm, TEST_USER_GROUP, self.REPO
1692 ),
1692 ),
1693 'success': True
1693 'success': True
1694 }
1694 }
1695 expected = ret
1695 expected = ret
1696 self._compare_ok(id_, expected, given=response.body)
1696 self._compare_ok(id_, expected, given=response.body)
1697
1697
1698 def test_api_grant_user_group_permission_wrong_permission(self):
1698 def test_api_grant_user_group_permission_wrong_permission(self):
1699 perm = 'haha.no.permission'
1699 perm = 'haha.no.permission'
1700 id_, params = _build_data(self.apikey,
1700 id_, params = _build_data(self.apikey,
1701 'grant_user_group_permission',
1701 'grant_user_group_permission',
1702 repoid=self.REPO,
1702 repoid=self.REPO,
1703 usergroupid=TEST_USER_GROUP,
1703 usergroupid=TEST_USER_GROUP,
1704 perm=perm)
1704 perm=perm)
1705 response = api_call(self, params)
1705 response = api_call(self, params)
1706
1706
1707 expected = 'permission `%s` does not exist' % perm
1707 expected = 'permission `%s` does not exist' % perm
1708 self._compare_error(id_, expected, given=response.body)
1708 self._compare_error(id_, expected, given=response.body)
1709
1709
1710 @mock.patch.object(RepoModel, 'grant_user_group_permission', crash)
1710 @mock.patch.object(RepoModel, 'grant_user_group_permission', crash)
1711 def test_api_grant_user_group_permission_exception_when_adding(self):
1711 def test_api_grant_user_group_permission_exception_when_adding(self):
1712 perm = 'repository.read'
1712 perm = 'repository.read'
1713 id_, params = _build_data(self.apikey,
1713 id_, params = _build_data(self.apikey,
1714 'grant_user_group_permission',
1714 'grant_user_group_permission',
1715 repoid=self.REPO,
1715 repoid=self.REPO,
1716 usergroupid=TEST_USER_GROUP,
1716 usergroupid=TEST_USER_GROUP,
1717 perm=perm)
1717 perm=perm)
1718 response = api_call(self, params)
1718 response = api_call(self, params)
1719
1719
1720 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1720 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1721 TEST_USER_GROUP, self.REPO
1721 TEST_USER_GROUP, self.REPO
1722 )
1722 )
1723 self._compare_error(id_, expected, given=response.body)
1723 self._compare_error(id_, expected, given=response.body)
1724
1724
1725 def test_api_revoke_user_group_permission(self):
1725 def test_api_revoke_user_group_permission(self):
1726 RepoModel().grant_user_group_permission(repo=self.REPO,
1726 RepoModel().grant_user_group_permission(repo=self.REPO,
1727 group_name=TEST_USER_GROUP,
1727 group_name=TEST_USER_GROUP,
1728 perm='repository.read')
1728 perm='repository.read')
1729 Session().commit()
1729 Session().commit()
1730 id_, params = _build_data(self.apikey,
1730 id_, params = _build_data(self.apikey,
1731 'revoke_user_group_permission',
1731 'revoke_user_group_permission',
1732 repoid=self.REPO,
1732 repoid=self.REPO,
1733 usergroupid=TEST_USER_GROUP, )
1733 usergroupid=TEST_USER_GROUP, )
1734 response = api_call(self, params)
1734 response = api_call(self, params)
1735
1735
1736 expected = {
1736 expected = {
1737 'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
1737 'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
1738 TEST_USER_GROUP, self.REPO
1738 TEST_USER_GROUP, self.REPO
1739 ),
1739 ),
1740 'success': True
1740 'success': True
1741 }
1741 }
1742 self._compare_ok(id_, expected, given=response.body)
1742 self._compare_ok(id_, expected, given=response.body)
1743
1743
1744 @mock.patch.object(RepoModel, 'revoke_user_group_permission', crash)
1744 @mock.patch.object(RepoModel, 'revoke_user_group_permission', crash)
1745 def test_api_revoke_user_group_permission_exception_when_adding(self):
1745 def test_api_revoke_user_group_permission_exception_when_adding(self):
1746 id_, params = _build_data(self.apikey,
1746 id_, params = _build_data(self.apikey,
1747 'revoke_user_group_permission',
1747 'revoke_user_group_permission',
1748 repoid=self.REPO,
1748 repoid=self.REPO,
1749 usergroupid=TEST_USER_GROUP, )
1749 usergroupid=TEST_USER_GROUP, )
1750 response = api_call(self, params)
1750 response = api_call(self, params)
1751
1751
1752 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1752 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1753 TEST_USER_GROUP, self.REPO
1753 TEST_USER_GROUP, self.REPO
1754 )
1754 )
1755 self._compare_error(id_, expected, given=response.body)
1755 self._compare_error(id_, expected, given=response.body)
1756
1756
1757 @parameterized.expand([
1757 @parameterized.expand([
1758 ('none', 'group.none', 'none'),
1758 ('none', 'group.none', 'none'),
1759 ('read', 'group.read', 'none'),
1759 ('read', 'group.read', 'none'),
1760 ('write', 'group.write', 'none'),
1760 ('write', 'group.write', 'none'),
1761 ('admin', 'group.admin', 'none'),
1761 ('admin', 'group.admin', 'none'),
1762
1762
1763 ('none', 'group.none', 'all'),
1763 ('none', 'group.none', 'all'),
1764 ('read', 'group.read', 'all'),
1764 ('read', 'group.read', 'all'),
1765 ('write', 'group.write', 'all'),
1765 ('write', 'group.write', 'all'),
1766 ('admin', 'group.admin', 'all'),
1766 ('admin', 'group.admin', 'all'),
1767
1767
1768 ('none', 'group.none', 'repos'),
1768 ('none', 'group.none', 'repos'),
1769 ('read', 'group.read', 'repos'),
1769 ('read', 'group.read', 'repos'),
1770 ('write', 'group.write', 'repos'),
1770 ('write', 'group.write', 'repos'),
1771 ('admin', 'group.admin', 'repos'),
1771 ('admin', 'group.admin', 'repos'),
1772
1772
1773 ('none', 'group.none', 'groups'),
1773 ('none', 'group.none', 'groups'),
1774 ('read', 'group.read', 'groups'),
1774 ('read', 'group.read', 'groups'),
1775 ('write', 'group.write', 'groups'),
1775 ('write', 'group.write', 'groups'),
1776 ('admin', 'group.admin', 'groups'),
1776 ('admin', 'group.admin', 'groups'),
1777 ])
1777 ])
1778 def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children):
1778 def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children):
1779 id_, params = _build_data(self.apikey,
1779 id_, params = _build_data(self.apikey,
1780 'grant_user_permission_to_repo_group',
1780 'grant_user_permission_to_repo_group',
1781 repogroupid=TEST_REPO_GROUP,
1781 repogroupid=TEST_REPO_GROUP,
1782 userid=TEST_USER_ADMIN_LOGIN,
1782 userid=TEST_USER_ADMIN_LOGIN,
1783 perm=perm, apply_to_children=apply_to_children)
1783 perm=perm, apply_to_children=apply_to_children)
1784 response = api_call(self, params)
1784 response = api_call(self, params)
1785
1785
1786 ret = {
1786 ret = {
1787 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1787 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1788 perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1788 perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1789 ),
1789 ),
1790 'success': True
1790 'success': True
1791 }
1791 }
1792 expected = ret
1792 expected = ret
1793 self._compare_ok(id_, expected, given=response.body)
1793 self._compare_ok(id_, expected, given=response.body)
1794
1794
1795 @parameterized.expand([
1795 @parameterized.expand([
1796 ('none_fails', 'group.none', 'none', False, False),
1796 ('none_fails', 'group.none', 'none', False, False),
1797 ('read_fails', 'group.read', 'none', False, False),
1797 ('read_fails', 'group.read', 'none', False, False),
1798 ('write_fails', 'group.write', 'none', False, False),
1798 ('write_fails', 'group.write', 'none', False, False),
1799 ('admin_fails', 'group.admin', 'none', False, False),
1799 ('admin_fails', 'group.admin', 'none', False, False),
1800
1800
1801 # with granted perms
1801 # with granted perms
1802 ('none_ok', 'group.none', 'none', True, True),
1802 ('none_ok', 'group.none', 'none', True, True),
1803 ('read_ok', 'group.read', 'none', True, True),
1803 ('read_ok', 'group.read', 'none', True, True),
1804 ('write_ok', 'group.write', 'none', True, True),
1804 ('write_ok', 'group.write', 'none', True, True),
1805 ('admin_ok', 'group.admin', 'none', True, True),
1805 ('admin_ok', 'group.admin', 'none', True, True),
1806 ])
1806 ])
1807 def test_api_grant_user_permission_to_repo_group_by_regular_user(
1807 def test_api_grant_user_permission_to_repo_group_by_regular_user(
1808 self, name, perm, apply_to_children, grant_admin, access_ok):
1808 self, name, perm, apply_to_children, grant_admin, access_ok):
1809 if grant_admin:
1809 if grant_admin:
1810 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1810 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1811 self.TEST_USER_LOGIN,
1811 self.TEST_USER_LOGIN,
1812 'group.admin')
1812 'group.admin')
1813 Session().commit()
1813 Session().commit()
1814
1814
1815 id_, params = _build_data(self.apikey_regular,
1815 id_, params = _build_data(self.apikey_regular,
1816 'grant_user_permission_to_repo_group',
1816 'grant_user_permission_to_repo_group',
1817 repogroupid=TEST_REPO_GROUP,
1817 repogroupid=TEST_REPO_GROUP,
1818 userid=TEST_USER_ADMIN_LOGIN,
1818 userid=TEST_USER_ADMIN_LOGIN,
1819 perm=perm, apply_to_children=apply_to_children)
1819 perm=perm, apply_to_children=apply_to_children)
1820 response = api_call(self, params)
1820 response = api_call(self, params)
1821 if access_ok:
1821 if access_ok:
1822 ret = {
1822 ret = {
1823 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1823 'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
1824 perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1824 perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1825 ),
1825 ),
1826 'success': True
1826 'success': True
1827 }
1827 }
1828 expected = ret
1828 expected = ret
1829 self._compare_ok(id_, expected, given=response.body)
1829 self._compare_ok(id_, expected, given=response.body)
1830 else:
1830 else:
1831 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
1831 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
1832 self._compare_error(id_, expected, given=response.body)
1832 self._compare_error(id_, expected, given=response.body)
1833
1833
1834 def test_api_grant_user_permission_to_repo_group_wrong_permission(self):
1834 def test_api_grant_user_permission_to_repo_group_wrong_permission(self):
1835 perm = 'haha.no.permission'
1835 perm = 'haha.no.permission'
1836 id_, params = _build_data(self.apikey,
1836 id_, params = _build_data(self.apikey,
1837 'grant_user_permission_to_repo_group',
1837 'grant_user_permission_to_repo_group',
1838 repogroupid=TEST_REPO_GROUP,
1838 repogroupid=TEST_REPO_GROUP,
1839 userid=TEST_USER_ADMIN_LOGIN,
1839 userid=TEST_USER_ADMIN_LOGIN,
1840 perm=perm)
1840 perm=perm)
1841 response = api_call(self, params)
1841 response = api_call(self, params)
1842
1842
1843 expected = 'permission `%s` does not exist' % perm
1843 expected = 'permission `%s` does not exist' % perm
1844 self._compare_error(id_, expected, given=response.body)
1844 self._compare_error(id_, expected, given=response.body)
1845
1845
1846 @mock.patch.object(RepoGroupModel, 'grant_user_permission', crash)
1846 @mock.patch.object(RepoGroupModel, 'grant_user_permission', crash)
1847 def test_api_grant_user_permission_to_repo_group_exception_when_adding(self):
1847 def test_api_grant_user_permission_to_repo_group_exception_when_adding(self):
1848 perm = 'group.read'
1848 perm = 'group.read'
1849 id_, params = _build_data(self.apikey,
1849 id_, params = _build_data(self.apikey,
1850 'grant_user_permission_to_repo_group',
1850 'grant_user_permission_to_repo_group',
1851 repogroupid=TEST_REPO_GROUP,
1851 repogroupid=TEST_REPO_GROUP,
1852 userid=TEST_USER_ADMIN_LOGIN,
1852 userid=TEST_USER_ADMIN_LOGIN,
1853 perm=perm)
1853 perm=perm)
1854 response = api_call(self, params)
1854 response = api_call(self, params)
1855
1855
1856 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1856 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1857 TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1857 TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1858 )
1858 )
1859 self._compare_error(id_, expected, given=response.body)
1859 self._compare_error(id_, expected, given=response.body)
1860
1860
1861 @parameterized.expand([
1861 @parameterized.expand([
1862 ('none', 'none'),
1862 ('none', 'none'),
1863 ('all', 'all'),
1863 ('all', 'all'),
1864 ('repos', 'repos'),
1864 ('repos', 'repos'),
1865 ('groups', 'groups'),
1865 ('groups', 'groups'),
1866 ])
1866 ])
1867 def test_api_revoke_user_permission_from_repo_group(self, name, apply_to_children):
1867 def test_api_revoke_user_permission_from_repo_group(self, name, apply_to_children):
1868 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
1868 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
1869 user=TEST_USER_ADMIN_LOGIN,
1869 user=TEST_USER_ADMIN_LOGIN,
1870 perm='group.read',)
1870 perm='group.read',)
1871 Session().commit()
1871 Session().commit()
1872
1872
1873 id_, params = _build_data(self.apikey,
1873 id_, params = _build_data(self.apikey,
1874 'revoke_user_permission_from_repo_group',
1874 'revoke_user_permission_from_repo_group',
1875 repogroupid=TEST_REPO_GROUP,
1875 repogroupid=TEST_REPO_GROUP,
1876 userid=TEST_USER_ADMIN_LOGIN,
1876 userid=TEST_USER_ADMIN_LOGIN,
1877 apply_to_children=apply_to_children,)
1877 apply_to_children=apply_to_children,)
1878 response = api_call(self, params)
1878 response = api_call(self, params)
1879
1879
1880 expected = {
1880 expected = {
1881 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
1881 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
1882 apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1882 apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1883 ),
1883 ),
1884 'success': True
1884 'success': True
1885 }
1885 }
1886 self._compare_ok(id_, expected, given=response.body)
1886 self._compare_ok(id_, expected, given=response.body)
1887
1887
1888 @parameterized.expand([
1888 @parameterized.expand([
1889 ('none', 'none', False, False),
1889 ('none', 'none', False, False),
1890 ('all', 'all', False, False),
1890 ('all', 'all', False, False),
1891 ('repos', 'repos', False, False),
1891 ('repos', 'repos', False, False),
1892 ('groups', 'groups', False, False),
1892 ('groups', 'groups', False, False),
1893
1893
1894 # after granting admin rights
1894 # after granting admin rights
1895 ('none', 'none', False, False),
1895 ('none', 'none', False, False),
1896 ('all', 'all', False, False),
1896 ('all', 'all', False, False),
1897 ('repos', 'repos', False, False),
1897 ('repos', 'repos', False, False),
1898 ('groups', 'groups', False, False),
1898 ('groups', 'groups', False, False),
1899 ])
1899 ])
1900 def test_api_revoke_user_permission_from_repo_group_by_regular_user(
1900 def test_api_revoke_user_permission_from_repo_group_by_regular_user(
1901 self, name, apply_to_children, grant_admin, access_ok):
1901 self, name, apply_to_children, grant_admin, access_ok):
1902 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
1902 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
1903 user=TEST_USER_ADMIN_LOGIN,
1903 user=TEST_USER_ADMIN_LOGIN,
1904 perm='group.read',)
1904 perm='group.read',)
1905 Session().commit()
1905 Session().commit()
1906
1906
1907 if grant_admin:
1907 if grant_admin:
1908 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1908 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1909 self.TEST_USER_LOGIN,
1909 self.TEST_USER_LOGIN,
1910 'group.admin')
1910 'group.admin')
1911 Session().commit()
1911 Session().commit()
1912
1912
1913 id_, params = _build_data(self.apikey_regular,
1913 id_, params = _build_data(self.apikey_regular,
1914 'revoke_user_permission_from_repo_group',
1914 'revoke_user_permission_from_repo_group',
1915 repogroupid=TEST_REPO_GROUP,
1915 repogroupid=TEST_REPO_GROUP,
1916 userid=TEST_USER_ADMIN_LOGIN,
1916 userid=TEST_USER_ADMIN_LOGIN,
1917 apply_to_children=apply_to_children,)
1917 apply_to_children=apply_to_children,)
1918 response = api_call(self, params)
1918 response = api_call(self, params)
1919 if access_ok:
1919 if access_ok:
1920 expected = {
1920 expected = {
1921 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
1921 'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
1922 apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1922 apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1923 ),
1923 ),
1924 'success': True
1924 'success': True
1925 }
1925 }
1926 self._compare_ok(id_, expected, given=response.body)
1926 self._compare_ok(id_, expected, given=response.body)
1927 else:
1927 else:
1928 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
1928 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
1929 self._compare_error(id_, expected, given=response.body)
1929 self._compare_error(id_, expected, given=response.body)
1930
1930
1931 @mock.patch.object(RepoGroupModel, 'revoke_user_permission', crash)
1931 @mock.patch.object(RepoGroupModel, 'revoke_user_permission', crash)
1932 def test_api_revoke_user_permission_from_repo_group_exception_when_adding(self):
1932 def test_api_revoke_user_permission_from_repo_group_exception_when_adding(self):
1933 id_, params = _build_data(self.apikey,
1933 id_, params = _build_data(self.apikey,
1934 'revoke_user_permission_from_repo_group',
1934 'revoke_user_permission_from_repo_group',
1935 repogroupid=TEST_REPO_GROUP,
1935 repogroupid=TEST_REPO_GROUP,
1936 userid=TEST_USER_ADMIN_LOGIN, )
1936 userid=TEST_USER_ADMIN_LOGIN, )
1937 response = api_call(self, params)
1937 response = api_call(self, params)
1938
1938
1939 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1939 expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
1940 TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1940 TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
1941 )
1941 )
1942 self._compare_error(id_, expected, given=response.body)
1942 self._compare_error(id_, expected, given=response.body)
1943
1943
1944 @parameterized.expand([
1944 @parameterized.expand([
1945 ('none', 'group.none', 'none'),
1945 ('none', 'group.none', 'none'),
1946 ('read', 'group.read', 'none'),
1946 ('read', 'group.read', 'none'),
1947 ('write', 'group.write', 'none'),
1947 ('write', 'group.write', 'none'),
1948 ('admin', 'group.admin', 'none'),
1948 ('admin', 'group.admin', 'none'),
1949
1949
1950 ('none', 'group.none', 'all'),
1950 ('none', 'group.none', 'all'),
1951 ('read', 'group.read', 'all'),
1951 ('read', 'group.read', 'all'),
1952 ('write', 'group.write', 'all'),
1952 ('write', 'group.write', 'all'),
1953 ('admin', 'group.admin', 'all'),
1953 ('admin', 'group.admin', 'all'),
1954
1954
1955 ('none', 'group.none', 'repos'),
1955 ('none', 'group.none', 'repos'),
1956 ('read', 'group.read', 'repos'),
1956 ('read', 'group.read', 'repos'),
1957 ('write', 'group.write', 'repos'),
1957 ('write', 'group.write', 'repos'),
1958 ('admin', 'group.admin', 'repos'),
1958 ('admin', 'group.admin', 'repos'),
1959
1959
1960 ('none', 'group.none', 'groups'),
1960 ('none', 'group.none', 'groups'),
1961 ('read', 'group.read', 'groups'),
1961 ('read', 'group.read', 'groups'),
1962 ('write', 'group.write', 'groups'),
1962 ('write', 'group.write', 'groups'),
1963 ('admin', 'group.admin', 'groups'),
1963 ('admin', 'group.admin', 'groups'),
1964 ])
1964 ])
1965 def test_api_grant_user_group_permission_to_repo_group(self, name, perm, apply_to_children):
1965 def test_api_grant_user_group_permission_to_repo_group(self, name, perm, apply_to_children):
1966 id_, params = _build_data(self.apikey,
1966 id_, params = _build_data(self.apikey,
1967 'grant_user_group_permission_to_repo_group',
1967 'grant_user_group_permission_to_repo_group',
1968 repogroupid=TEST_REPO_GROUP,
1968 repogroupid=TEST_REPO_GROUP,
1969 usergroupid=TEST_USER_GROUP,
1969 usergroupid=TEST_USER_GROUP,
1970 perm=perm,
1970 perm=perm,
1971 apply_to_children=apply_to_children,)
1971 apply_to_children=apply_to_children,)
1972 response = api_call(self, params)
1972 response = api_call(self, params)
1973
1973
1974 ret = {
1974 ret = {
1975 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
1975 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
1976 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
1976 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
1977 ),
1977 ),
1978 'success': True
1978 'success': True
1979 }
1979 }
1980 expected = ret
1980 expected = ret
1981 self._compare_ok(id_, expected, given=response.body)
1981 self._compare_ok(id_, expected, given=response.body)
1982
1982
1983 @parameterized.expand([
1983 @parameterized.expand([
1984 ('none_fails', 'group.none', 'none', False, False),
1984 ('none_fails', 'group.none', 'none', False, False),
1985 ('read_fails', 'group.read', 'none', False, False),
1985 ('read_fails', 'group.read', 'none', False, False),
1986 ('write_fails', 'group.write', 'none', False, False),
1986 ('write_fails', 'group.write', 'none', False, False),
1987 ('admin_fails', 'group.admin', 'none', False, False),
1987 ('admin_fails', 'group.admin', 'none', False, False),
1988
1988
1989 # with granted perms
1989 # with granted perms
1990 ('none_ok', 'group.none', 'none', True, True),
1990 ('none_ok', 'group.none', 'none', True, True),
1991 ('read_ok', 'group.read', 'none', True, True),
1991 ('read_ok', 'group.read', 'none', True, True),
1992 ('write_ok', 'group.write', 'none', True, True),
1992 ('write_ok', 'group.write', 'none', True, True),
1993 ('admin_ok', 'group.admin', 'none', True, True),
1993 ('admin_ok', 'group.admin', 'none', True, True),
1994 ])
1994 ])
1995 def test_api_grant_user_group_permission_to_repo_group_by_regular_user(
1995 def test_api_grant_user_group_permission_to_repo_group_by_regular_user(
1996 self, name, perm, apply_to_children, grant_admin, access_ok):
1996 self, name, perm, apply_to_children, grant_admin, access_ok):
1997 if grant_admin:
1997 if grant_admin:
1998 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1998 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
1999 self.TEST_USER_LOGIN,
1999 self.TEST_USER_LOGIN,
2000 'group.admin')
2000 'group.admin')
2001 Session().commit()
2001 Session().commit()
2002
2002
2003 id_, params = _build_data(self.apikey_regular,
2003 id_, params = _build_data(self.apikey_regular,
2004 'grant_user_group_permission_to_repo_group',
2004 'grant_user_group_permission_to_repo_group',
2005 repogroupid=TEST_REPO_GROUP,
2005 repogroupid=TEST_REPO_GROUP,
2006 usergroupid=TEST_USER_GROUP,
2006 usergroupid=TEST_USER_GROUP,
2007 perm=perm,
2007 perm=perm,
2008 apply_to_children=apply_to_children,)
2008 apply_to_children=apply_to_children,)
2009 response = api_call(self, params)
2009 response = api_call(self, params)
2010 if access_ok:
2010 if access_ok:
2011 ret = {
2011 ret = {
2012 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2012 'msg': 'Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2013 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2013 perm, apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2014 ),
2014 ),
2015 'success': True
2015 'success': True
2016 }
2016 }
2017 expected = ret
2017 expected = ret
2018 self._compare_ok(id_, expected, given=response.body)
2018 self._compare_ok(id_, expected, given=response.body)
2019 else:
2019 else:
2020 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2020 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2021 self._compare_error(id_, expected, given=response.body)
2021 self._compare_error(id_, expected, given=response.body)
2022
2022
2023 def test_api_grant_user_group_permission_to_repo_group_wrong_permission(self):
2023 def test_api_grant_user_group_permission_to_repo_group_wrong_permission(self):
2024 perm = 'haha.no.permission'
2024 perm = 'haha.no.permission'
2025 id_, params = _build_data(self.apikey,
2025 id_, params = _build_data(self.apikey,
2026 'grant_user_group_permission_to_repo_group',
2026 'grant_user_group_permission_to_repo_group',
2027 repogroupid=TEST_REPO_GROUP,
2027 repogroupid=TEST_REPO_GROUP,
2028 usergroupid=TEST_USER_GROUP,
2028 usergroupid=TEST_USER_GROUP,
2029 perm=perm)
2029 perm=perm)
2030 response = api_call(self, params)
2030 response = api_call(self, params)
2031
2031
2032 expected = 'permission `%s` does not exist' % perm
2032 expected = 'permission `%s` does not exist' % perm
2033 self._compare_error(id_, expected, given=response.body)
2033 self._compare_error(id_, expected, given=response.body)
2034
2034
2035 @mock.patch.object(RepoGroupModel, 'grant_user_group_permission', crash)
2035 @mock.patch.object(RepoGroupModel, 'grant_user_group_permission', crash)
2036 def test_api_grant_user_group_permission_exception_when_adding(self):
2036 def test_api_grant_user_group_permission_exception_when_adding(self):
2037 perm = 'group.read'
2037 perm = 'group.read'
2038 id_, params = _build_data(self.apikey,
2038 id_, params = _build_data(self.apikey,
2039 'grant_user_group_permission_to_repo_group',
2039 'grant_user_group_permission_to_repo_group',
2040 repogroupid=TEST_REPO_GROUP,
2040 repogroupid=TEST_REPO_GROUP,
2041 usergroupid=TEST_USER_GROUP,
2041 usergroupid=TEST_USER_GROUP,
2042 perm=perm)
2042 perm=perm)
2043 response = api_call(self, params)
2043 response = api_call(self, params)
2044
2044
2045 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2045 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2046 TEST_USER_GROUP, TEST_REPO_GROUP
2046 TEST_USER_GROUP, TEST_REPO_GROUP
2047 )
2047 )
2048 self._compare_error(id_, expected, given=response.body)
2048 self._compare_error(id_, expected, given=response.body)
2049
2049
2050 @parameterized.expand([
2050 @parameterized.expand([
2051 ('none', 'none'),
2051 ('none', 'none'),
2052 ('all', 'all'),
2052 ('all', 'all'),
2053 ('repos', 'repos'),
2053 ('repos', 'repos'),
2054 ('groups', 'groups'),
2054 ('groups', 'groups'),
2055 ])
2055 ])
2056 def test_api_revoke_user_group_permission_from_repo_group(self, name, apply_to_children):
2056 def test_api_revoke_user_group_permission_from_repo_group(self, name, apply_to_children):
2057 RepoGroupModel().grant_user_group_permission(repo_group=TEST_REPO_GROUP,
2057 RepoGroupModel().grant_user_group_permission(repo_group=TEST_REPO_GROUP,
2058 group_name=TEST_USER_GROUP,
2058 group_name=TEST_USER_GROUP,
2059 perm='group.read',)
2059 perm='group.read',)
2060 Session().commit()
2060 Session().commit()
2061 id_, params = _build_data(self.apikey,
2061 id_, params = _build_data(self.apikey,
2062 'revoke_user_group_permission_from_repo_group',
2062 'revoke_user_group_permission_from_repo_group',
2063 repogroupid=TEST_REPO_GROUP,
2063 repogroupid=TEST_REPO_GROUP,
2064 usergroupid=TEST_USER_GROUP,
2064 usergroupid=TEST_USER_GROUP,
2065 apply_to_children=apply_to_children,)
2065 apply_to_children=apply_to_children,)
2066 response = api_call(self, params)
2066 response = api_call(self, params)
2067
2067
2068 expected = {
2068 expected = {
2069 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2069 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2070 apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2070 apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
2071 ),
2071 ),
2072 'success': True
2072 'success': True
2073 }
2073 }
2074 self._compare_ok(id_, expected, given=response.body)
2074 self._compare_ok(id_, expected, given=response.body)
2075
2075
2076 @parameterized.expand([
2076 @parameterized.expand([
2077 ('none', 'none', False, False),
2077 ('none', 'none', False, False),
2078 ('all', 'all', False, False),
2078 ('all', 'all', False, False),
2079 ('repos', 'repos', False, False),
2079 ('repos', 'repos', False, False),
2080 ('groups', 'groups', False, False),
2080 ('groups', 'groups', False, False),
2081
2081
2082 # after granting admin rights
2082 # after granting admin rights
2083 ('none', 'none', False, False),
2083 ('none', 'none', False, False),
2084 ('all', 'all', False, False),
2084 ('all', 'all', False, False),
2085 ('repos', 'repos', False, False),
2085 ('repos', 'repos', False, False),
2086 ('groups', 'groups', False, False),
2086 ('groups', 'groups', False, False),
2087 ])
2087 ])
2088 def test_api_revoke_user_group_permission_from_repo_group_by_regular_user(
2088 def test_api_revoke_user_group_permission_from_repo_group_by_regular_user(
2089 self, name, apply_to_children, grant_admin, access_ok):
2089 self, name, apply_to_children, grant_admin, access_ok):
2090 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2090 RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
2091 user=TEST_USER_ADMIN_LOGIN,
2091 user=TEST_USER_ADMIN_LOGIN,
2092 perm='group.read',)
2092 perm='group.read',)
2093 Session().commit()
2093 Session().commit()
2094
2094
2095 if grant_admin:
2095 if grant_admin:
2096 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2096 RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
2097 self.TEST_USER_LOGIN,
2097 self.TEST_USER_LOGIN,
2098 'group.admin')
2098 'group.admin')
2099 Session().commit()
2099 Session().commit()
2100
2100
2101 id_, params = _build_data(self.apikey_regular,
2101 id_, params = _build_data(self.apikey_regular,
2102 'revoke_user_group_permission_from_repo_group',
2102 'revoke_user_group_permission_from_repo_group',
2103 repogroupid=TEST_REPO_GROUP,
2103 repogroupid=TEST_REPO_GROUP,
2104 usergroupid=TEST_USER_GROUP,
2104 usergroupid=TEST_USER_GROUP,
2105 apply_to_children=apply_to_children,)
2105 apply_to_children=apply_to_children,)
2106 response = api_call(self, params)
2106 response = api_call(self, params)
2107 if access_ok:
2107 if access_ok:
2108 expected = {
2108 expected = {
2109 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2109 'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
2110 apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2110 apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
2111 ),
2111 ),
2112 'success': True
2112 'success': True
2113 }
2113 }
2114 self._compare_ok(id_, expected, given=response.body)
2114 self._compare_ok(id_, expected, given=response.body)
2115 else:
2115 else:
2116 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2116 expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
2117 self._compare_error(id_, expected, given=response.body)
2117 self._compare_error(id_, expected, given=response.body)
2118
2118
2119 @mock.patch.object(RepoGroupModel, 'revoke_user_group_permission', crash)
2119 @mock.patch.object(RepoGroupModel, 'revoke_user_group_permission', crash)
2120 def test_api_revoke_user_group_permission_from_repo_group_exception_when_adding(self):
2120 def test_api_revoke_user_group_permission_from_repo_group_exception_when_adding(self):
2121 id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group',
2121 id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group',
2122 repogroupid=TEST_REPO_GROUP,
2122 repogroupid=TEST_REPO_GROUP,
2123 usergroupid=TEST_USER_GROUP,)
2123 usergroupid=TEST_USER_GROUP,)
2124 response = api_call(self, params)
2124 response = api_call(self, params)
2125
2125
2126 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2126 expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
2127 TEST_USER_GROUP, TEST_REPO_GROUP
2127 TEST_USER_GROUP, TEST_REPO_GROUP
2128 )
2128 )
2129 self._compare_error(id_, expected, given=response.body)
2129 self._compare_error(id_, expected, given=response.body)
2130
2130
2131 def test_api_get_gist(self):
2131 def test_api_get_gist(self):
2132 gist = fixture.create_gist()
2132 gist = fixture.create_gist()
2133 gist_id = gist.gist_access_id
2133 gist_id = gist.gist_access_id
2134 gist_created_on = gist.created_on
2134 gist_created_on = gist.created_on
2135 id_, params = _build_data(self.apikey, 'get_gist',
2135 id_, params = _build_data(self.apikey, 'get_gist',
2136 gistid=gist_id, )
2136 gistid=gist_id, )
2137 response = api_call(self, params)
2137 response = api_call(self, params)
2138
2138
2139 expected = {
2139 expected = {
2140 'access_id': gist_id,
2140 'access_id': gist_id,
2141 'created_on': gist_created_on,
2141 'created_on': gist_created_on,
2142 'description': 'new-gist',
2142 'description': 'new-gist',
2143 'expires': -1.0,
2143 'expires': -1.0,
2144 'gist_id': int(gist_id),
2144 'gist_id': int(gist_id),
2145 'type': 'public',
2145 'type': 'public',
2146 'url': 'http://localhost:80/_admin/gists/%s' % gist_id
2146 'url': 'http://localhost:80/_admin/gists/%s' % gist_id
2147 }
2147 }
2148
2148
2149 self._compare_ok(id_, expected, given=response.body)
2149 self._compare_ok(id_, expected, given=response.body)
2150
2150
2151 def test_api_get_gist_that_does_not_exist(self):
2151 def test_api_get_gist_that_does_not_exist(self):
2152 id_, params = _build_data(self.apikey_regular, 'get_gist',
2152 id_, params = _build_data(self.apikey_regular, 'get_gist',
2153 gistid='12345', )
2153 gistid='12345', )
2154 response = api_call(self, params)
2154 response = api_call(self, params)
2155 expected = 'gist `%s` does not exist' % ('12345',)
2155 expected = 'gist `%s` does not exist' % ('12345',)
2156 self._compare_error(id_, expected, given=response.body)
2156 self._compare_error(id_, expected, given=response.body)
2157
2157
2158 def test_api_get_gist_private_gist_without_permission(self):
2158 def test_api_get_gist_private_gist_without_permission(self):
2159 gist = fixture.create_gist()
2159 gist = fixture.create_gist()
2160 gist_id = gist.gist_access_id
2160 gist_id = gist.gist_access_id
2161 gist_created_on = gist.created_on
2161 gist_created_on = gist.created_on
2162 id_, params = _build_data(self.apikey_regular, 'get_gist',
2162 id_, params = _build_data(self.apikey_regular, 'get_gist',
2163 gistid=gist_id, )
2163 gistid=gist_id, )
2164 response = api_call(self, params)
2164 response = api_call(self, params)
2165
2165
2166 expected = 'gist `%s` does not exist' % gist_id
2166 expected = 'gist `%s` does not exist' % gist_id
2167 self._compare_error(id_, expected, given=response.body)
2167 self._compare_error(id_, expected, given=response.body)
2168
2168
2169 def test_api_get_gists(self):
2169 def test_api_get_gists(self):
2170 fixture.create_gist()
2170 fixture.create_gist()
2171 fixture.create_gist()
2171 fixture.create_gist()
2172
2172
2173 id_, params = _build_data(self.apikey, 'get_gists')
2173 id_, params = _build_data(self.apikey, 'get_gists')
2174 response = api_call(self, params)
2174 response = api_call(self, params)
2175 expected = response.json
2175 expected = response.json
2176 self.assertEqual(len(response.json['result']), 2)
2176 self.assertEqual(len(response.json['result']), 2)
2177 #self._compare_ok(id_, expected, given=response.body)
2177 #self._compare_ok(id_, expected, given=response.body)
2178
2178
2179 def test_api_get_gists_regular_user(self):
2179 def test_api_get_gists_regular_user(self):
2180 # by admin
2180 # by admin
2181 fixture.create_gist()
2181 fixture.create_gist()
2182 fixture.create_gist()
2182 fixture.create_gist()
2183
2183
2184 # by reg user
2184 # by reg user
2185 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2185 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2186 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2186 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2187 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2187 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2188
2188
2189 id_, params = _build_data(self.apikey_regular, 'get_gists')
2189 id_, params = _build_data(self.apikey_regular, 'get_gists')
2190 response = api_call(self, params)
2190 response = api_call(self, params)
2191 expected = response.json
2191 expected = response.json
2192 self.assertEqual(len(response.json['result']), 3)
2192 self.assertEqual(len(response.json['result']), 3)
2193 #self._compare_ok(id_, expected, given=response.body)
2193 #self._compare_ok(id_, expected, given=response.body)
2194
2194
2195 def test_api_get_gists_only_for_regular_user(self):
2195 def test_api_get_gists_only_for_regular_user(self):
2196 # by admin
2196 # by admin
2197 fixture.create_gist()
2197 fixture.create_gist()
2198 fixture.create_gist()
2198 fixture.create_gist()
2199
2199
2200 # by reg user
2200 # by reg user
2201 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2201 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2202 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2202 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2203 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2203 fixture.create_gist(owner=self.TEST_USER_LOGIN)
2204
2204
2205 id_, params = _build_data(self.apikey, 'get_gists',
2205 id_, params = _build_data(self.apikey, 'get_gists',
2206 userid=self.TEST_USER_LOGIN)
2206 userid=self.TEST_USER_LOGIN)
2207 response = api_call(self, params)
2207 response = api_call(self, params)
2208 expected = response.json
2208 expected = response.json
2209 self.assertEqual(len(response.json['result']), 3)
2209 self.assertEqual(len(response.json['result']), 3)
2210 #self._compare_ok(id_, expected, given=response.body)
2210 #self._compare_ok(id_, expected, given=response.body)
2211
2211
2212 def test_api_get_gists_regular_user_with_different_userid(self):
2212 def test_api_get_gists_regular_user_with_different_userid(self):
2213 id_, params = _build_data(self.apikey_regular, 'get_gists',
2213 id_, params = _build_data(self.apikey_regular, 'get_gists',
2214 userid=TEST_USER_ADMIN_LOGIN)
2214 userid=TEST_USER_ADMIN_LOGIN)
2215 response = api_call(self, params)
2215 response = api_call(self, params)
2216 expected = 'userid is not the same as your user'
2216 expected = 'userid is not the same as your user'
2217 self._compare_error(id_, expected, given=response.body)
2217 self._compare_error(id_, expected, given=response.body)
2218
2218
2219 def test_api_create_gist(self):
2219 def test_api_create_gist(self):
2220 id_, params = _build_data(self.apikey_regular, 'create_gist',
2220 id_, params = _build_data(self.apikey_regular, 'create_gist',
2221 lifetime=10,
2221 lifetime=10,
2222 description='foobar-gist',
2222 description='foobar-gist',
2223 gist_type='public',
2223 gist_type='public',
2224 files={'foobar': {'content': 'foo'}})
2224 files={'foobar': {'content': 'foo'}})
2225 response = api_call(self, params)
2225 response = api_call(self, params)
2226 response_json = response.json
2226 response_json = response.json
2227 expected = {
2227 expected = {
2228 'gist': {
2228 'gist': {
2229 'access_id': response_json['result']['gist']['access_id'],
2229 'access_id': response_json['result']['gist']['access_id'],
2230 'created_on': response_json['result']['gist']['created_on'],
2230 'created_on': response_json['result']['gist']['created_on'],
2231 'description': 'foobar-gist',
2231 'description': 'foobar-gist',
2232 'expires': response_json['result']['gist']['expires'],
2232 'expires': response_json['result']['gist']['expires'],
2233 'gist_id': response_json['result']['gist']['gist_id'],
2233 'gist_id': response_json['result']['gist']['gist_id'],
2234 'type': 'public',
2234 'type': 'public',
2235 'url': response_json['result']['gist']['url']
2235 'url': response_json['result']['gist']['url']
2236 },
2236 },
2237 'msg': 'created new gist'
2237 'msg': 'created new gist'
2238 }
2238 }
2239 self._compare_ok(id_, expected, given=response.body)
2239 self._compare_ok(id_, expected, given=response.body)
2240
2240
2241 @mock.patch.object(GistModel, 'create', crash)
2241 @mock.patch.object(GistModel, 'create', crash)
2242 def test_api_create_gist_exception_occured(self):
2242 def test_api_create_gist_exception_occured(self):
2243 id_, params = _build_data(self.apikey_regular, 'create_gist',
2243 id_, params = _build_data(self.apikey_regular, 'create_gist',
2244 files={})
2244 files={})
2245 response = api_call(self, params)
2245 response = api_call(self, params)
2246 expected = 'failed to create gist'
2246 expected = 'failed to create gist'
2247 self._compare_error(id_, expected, given=response.body)
2247 self._compare_error(id_, expected, given=response.body)
2248
2248
2249 def test_api_delete_gist(self):
2249 def test_api_delete_gist(self):
2250 gist_id = fixture.create_gist().gist_access_id
2250 gist_id = fixture.create_gist().gist_access_id
2251 id_, params = _build_data(self.apikey, 'delete_gist',
2251 id_, params = _build_data(self.apikey, 'delete_gist',
2252 gistid=gist_id)
2252 gistid=gist_id)
2253 response = api_call(self, params)
2253 response = api_call(self, params)
2254 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2254 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2255 self._compare_ok(id_, expected, given=response.body)
2255 self._compare_ok(id_, expected, given=response.body)
2256
2256
2257 def test_api_delete_gist_regular_user(self):
2257 def test_api_delete_gist_regular_user(self):
2258 gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
2258 gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
2259 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2259 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2260 gistid=gist_id)
2260 gistid=gist_id)
2261 response = api_call(self, params)
2261 response = api_call(self, params)
2262 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2262 expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
2263 self._compare_ok(id_, expected, given=response.body)
2263 self._compare_ok(id_, expected, given=response.body)
2264
2264
2265 def test_api_delete_gist_regular_user_no_permission(self):
2265 def test_api_delete_gist_regular_user_no_permission(self):
2266 gist_id = fixture.create_gist().gist_access_id
2266 gist_id = fixture.create_gist().gist_access_id
2267 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2267 id_, params = _build_data(self.apikey_regular, 'delete_gist',
2268 gistid=gist_id)
2268 gistid=gist_id)
2269 response = api_call(self, params)
2269 response = api_call(self, params)
2270 expected = 'gist `%s` does not exist' % (gist_id,)
2270 expected = 'gist `%s` does not exist' % (gist_id,)
2271 self._compare_error(id_, expected, given=response.body)
2271 self._compare_error(id_, expected, given=response.body)
2272
2272
2273 @mock.patch.object(GistModel, 'delete', crash)
2273 @mock.patch.object(GistModel, 'delete', crash)
2274 def test_api_delete_gist_exception_occured(self):
2274 def test_api_delete_gist_exception_occured(self):
2275 gist_id = fixture.create_gist().gist_access_id
2275 gist_id = fixture.create_gist().gist_access_id
2276 id_, params = _build_data(self.apikey, 'delete_gist',
2276 id_, params = _build_data(self.apikey, 'delete_gist',
2277 gistid=gist_id)
2277 gistid=gist_id)
2278 response = api_call(self, params)
2278 response = api_call(self, params)
2279 expected = 'failed to delete gist ID:%s' % (gist_id,)
2279 expected = 'failed to delete gist ID:%s' % (gist_id,)
2280 self._compare_error(id_, expected, given=response.body)
2280 self._compare_error(id_, expected, given=response.body)
2281
2281
2282 def test_api_get_ip(self):
2282 def test_api_get_ip(self):
2283 id_, params = _build_data(self.apikey, 'get_ip')
2283 id_, params = _build_data(self.apikey, 'get_ip')
2284 response = api_call(self, params)
2284 response = api_call(self, params)
2285 expected = {
2285 expected = {
2286 'server_ip_addr': '0.0.0.0',
2286 'server_ip_addr': '0.0.0.0',
2287 'user_ips': []
2287 'user_ips': []
2288 }
2288 }
2289 self._compare_ok(id_, expected, given=response.body)
2289 self._compare_ok(id_, expected, given=response.body)
2290
2290
2291 def test_api_get_server_info(self):
2291 def test_api_get_server_info(self):
2292 id_, params = _build_data(self.apikey, 'get_server_info')
2292 id_, params = _build_data(self.apikey, 'get_server_info')
2293 response = api_call(self, params)
2293 response = api_call(self, params)
2294 expected = Setting.get_server_info()
2294 expected = Setting.get_server_info()
2295 self._compare_ok(id_, expected, given=response.body)
2295 self._compare_ok(id_, expected, given=response.body)
General Comments 0
You need to be logged in to leave comments. Login now