##// END OF EJS Templates
users: add extra checks on editing the default user...
Thomas De Schampheleire -
r5168:4e076ea7 default
parent child Browse files
Show More
@@ -1,484 +1,485 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.controllers.admin.users
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 Users crud controller for pylons
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Apr 4, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import logging
29 29 import traceback
30 30 import formencode
31 31
32 32 from formencode import htmlfill
33 33 from pylons import request, tmpl_context as c, url, config
34 34 from pylons.controllers.util import redirect
35 35 from pylons.i18n.translation import _
36 36 from sqlalchemy.sql.expression import func
37 37 from webob.exc import HTTPNotFound
38 38
39 39 import kallithea
40 40 from kallithea.lib.exceptions import DefaultUserException, \
41 41 UserOwnsReposException, UserCreationError
42 42 from kallithea.lib import helpers as h
43 43 from kallithea.lib.auth import LoginRequired, HasPermissionAllDecorator, \
44 44 AuthUser
45 45 import kallithea.lib.auth_modules.auth_internal
46 46 from kallithea.lib import auth_modules
47 47 from kallithea.lib.base import BaseController, render
48 48 from kallithea.model.api_key import ApiKeyModel
49 49
50 50 from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
51 51 from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
52 52 from kallithea.model.user import UserModel
53 53 from kallithea.model.meta import Session
54 54 from kallithea.lib.utils import action_logger
55 55 from kallithea.lib.compat import json
56 56 from kallithea.lib.utils2 import datetime_to_time, safe_int, generate_api_key
57 57
58 58 log = logging.getLogger(__name__)
59 59
60 60
61 61 class UsersController(BaseController):
62 62 """REST Controller styled on the Atom Publishing Protocol"""
63 63
64 64 @LoginRequired()
65 65 @HasPermissionAllDecorator('hg.admin')
66 66 def __before__(self):
67 67 super(UsersController, self).__before__()
68 68 c.available_permissions = config['available_permissions']
69 69 c.EXTERN_TYPE_INTERNAL = kallithea.EXTERN_TYPE_INTERNAL
70 70
71 71 def index(self, format='html'):
72 72 """GET /users: All items in the collection"""
73 73 # url('users')
74 74
75 75 c.users_list = User.query().order_by(User.username)\
76 76 .filter(User.username != User.DEFAULT_USER)\
77 77 .order_by(func.lower(User.username))\
78 78 .all()
79 79
80 80 users_data = []
81 81 total_records = len(c.users_list)
82 82 _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
83 83 template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
84 84
85 85 grav_tmpl = '<div class="gravatar">%s</div>'
86 86
87 87 username = lambda user_id, username: (
88 88 template.get_def("user_name")
89 89 .render(user_id, username, _=_, h=h, c=c))
90 90
91 91 user_actions = lambda user_id, username: (
92 92 template.get_def("user_actions")
93 93 .render(user_id, username, _=_, h=h, c=c))
94 94
95 95 for user in c.users_list:
96 96 users_data.append({
97 97 "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
98 98 "raw_name": user.username,
99 99 "username": username(user.user_id, user.username),
100 100 "firstname": h.escape(user.name),
101 101 "lastname": h.escape(user.lastname),
102 102 "last_login": h.fmt_date(user.last_login),
103 103 "last_login_raw": datetime_to_time(user.last_login),
104 104 "active": h.boolicon(user.active),
105 105 "admin": h.boolicon(user.admin),
106 106 "extern_type": user.extern_type,
107 107 "extern_name": user.extern_name,
108 108 "action": user_actions(user.user_id, user.username),
109 109 })
110 110
111 111 c.data = json.dumps({
112 112 "totalRecords": total_records,
113 113 "startIndex": 0,
114 114 "sort": None,
115 115 "dir": "asc",
116 116 "records": users_data
117 117 })
118 118
119 119 return render('admin/users/users.html')
120 120
121 121 def create(self):
122 122 """POST /users: Create a new item"""
123 123 # url('users')
124 124 c.default_extern_type = auth_modules.auth_internal.KallitheaAuthPlugin.name
125 125 user_model = UserModel()
126 126 user_form = UserForm()()
127 127 try:
128 128 form_result = user_form.to_python(dict(request.POST))
129 129 user = user_model.create(form_result)
130 130 usr = form_result['username']
131 131 action_logger(self.authuser, 'admin_created_user:%s' % usr,
132 132 None, self.ip_addr, self.sa)
133 133 h.flash(h.literal(_('Created user %s') % h.link_to(h.escape(usr), url('edit_user', id=user.user_id))),
134 134 category='success')
135 135 Session().commit()
136 136 except formencode.Invalid, errors:
137 137 return htmlfill.render(
138 138 render('admin/users/user_add.html'),
139 139 defaults=errors.value,
140 140 errors=errors.error_dict or {},
141 141 prefix_error=False,
142 142 encoding="UTF-8",
143 143 force_defaults=False)
144 144 except UserCreationError, e:
145 145 h.flash(e, 'error')
146 146 except Exception:
147 147 log.error(traceback.format_exc())
148 148 h.flash(_('Error occurred during creation of user %s') \
149 149 % request.POST.get('username'), category='error')
150 150 return redirect(url('users'))
151 151
152 152 def new(self, format='html'):
153 153 """GET /users/new: Form to create a new item"""
154 154 # url('new_user')
155 155 c.default_extern_type = auth_modules.auth_internal.KallitheaAuthPlugin.name
156 156 return render('admin/users/user_add.html')
157 157
158 158 def update(self, id):
159 159 """PUT /users/id: Update an existing item"""
160 160 # Forms posted to this method should contain a hidden field:
161 161 # <input type="hidden" name="_method" value="PUT" />
162 162 # Or using helpers:
163 163 # h.form(url('update_user', id=ID),
164 164 # method='put')
165 165 # url('user', id=ID)
166 166 c.active = 'profile'
167 167 user_model = UserModel()
168 168 c.user = user_model.get(id)
169 169 c.extern_type = c.user.extern_type
170 170 c.extern_name = c.user.extern_name
171 171 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
172 172 _form = UserForm(edit=True, old_data={'user_id': id,
173 173 'email': c.user.email})()
174 174 form_result = {}
175 175 try:
176 176 form_result = _form.to_python(dict(request.POST))
177 177 skip_attrs = ['extern_type', 'extern_name']
178 178 #TODO: plugin should define if username can be updated
179 179 if c.extern_type != kallithea.EXTERN_TYPE_INTERNAL:
180 180 # forbid updating username for external accounts
181 181 skip_attrs.append('username')
182 182
183 183 user_model.update(id, form_result, skip_attrs=skip_attrs)
184 184 usr = form_result['username']
185 185 action_logger(self.authuser, 'admin_updated_user:%s' % usr,
186 186 None, self.ip_addr, self.sa)
187 187 h.flash(_('User updated successfully'), category='success')
188 188 Session().commit()
189 189 except formencode.Invalid, errors:
190 190 defaults = errors.value
191 191 e = errors.error_dict or {}
192 192 defaults.update({
193 193 'create_repo_perm': user_model.has_perm(id,
194 194 'hg.create.repository'),
195 195 'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'),
196 196 '_method': 'put'
197 197 })
198 198 return htmlfill.render(
199 199 render('admin/users/user_edit.html'),
200 200 defaults=defaults,
201 201 errors=e,
202 202 prefix_error=False,
203 203 encoding="UTF-8",
204 204 force_defaults=False)
205 205 except Exception:
206 206 log.error(traceback.format_exc())
207 207 h.flash(_('Error occurred during update of user %s') \
208 208 % form_result.get('username'), category='error')
209 209 return redirect(url('edit_user', id=id))
210 210
211 211 def delete(self, id):
212 212 """DELETE /users/id: Delete an existing item"""
213 213 # Forms posted to this method should contain a hidden field:
214 214 # <input type="hidden" name="_method" value="DELETE" />
215 215 # Or using helpers:
216 216 # h.form(url('delete_user', id=ID),
217 217 # method='delete')
218 218 # url('user', id=ID)
219 219 usr = User.get_or_404(id)
220 220 try:
221 221 UserModel().delete(usr)
222 222 Session().commit()
223 223 h.flash(_('Successfully deleted user'), category='success')
224 224 except (UserOwnsReposException, DefaultUserException), e:
225 225 h.flash(e, category='warning')
226 226 except Exception:
227 227 log.error(traceback.format_exc())
228 228 h.flash(_('An error occurred during deletion of user'),
229 229 category='error')
230 230 return redirect(url('users'))
231 231
232 232 def show(self, id, format='html'):
233 233 """GET /users/id: Show a specific item"""
234 234 # url('user', id=ID)
235 235 User.get_or_404(-1)
236 236
237 237 def _get_user_or_raise_if_default(self, id):
238 238 try:
239 239 return User.get_or_404(id, allow_default=False)
240 240 except DefaultUserException:
241 241 h.flash(_("The default user cannot be edited"), category='warning')
242 242 raise HTTPNotFound
243 243
244 244 def edit(self, id, format='html'):
245 245 """GET /users/id/edit: Form to edit an existing item"""
246 246 # url('edit_user', id=ID)
247 247 c.user = self._get_user_or_raise_if_default(id)
248 248 c.active = 'profile'
249 249 c.extern_type = c.user.extern_type
250 250 c.extern_name = c.user.extern_name
251 251 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
252 252
253 253 defaults = c.user.get_dict()
254 254 return htmlfill.render(
255 255 render('admin/users/user_edit.html'),
256 256 defaults=defaults,
257 257 encoding="UTF-8",
258 258 force_defaults=False)
259 259
260 260 def edit_advanced(self, id):
261 261 c.user = self._get_user_or_raise_if_default(id)
262 262 c.active = 'advanced'
263 263 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
264 264
265 265 umodel = UserModel()
266 266 defaults = c.user.get_dict()
267 267 defaults.update({
268 268 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
269 269 'create_user_group_perm': umodel.has_perm(c.user,
270 270 'hg.usergroup.create.true'),
271 271 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
272 272 })
273 273 return htmlfill.render(
274 274 render('admin/users/user_edit.html'),
275 275 defaults=defaults,
276 276 encoding="UTF-8",
277 277 force_defaults=False)
278 278
279 279 def edit_api_keys(self, id):
280 280 c.user = self._get_user_or_raise_if_default(id)
281 281 c.active = 'api_keys'
282 282 show_expired = True
283 283 c.lifetime_values = [
284 284 (str(-1), _('Forever')),
285 285 (str(5), _('5 minutes')),
286 286 (str(60), _('1 hour')),
287 287 (str(60 * 24), _('1 day')),
288 288 (str(60 * 24 * 30), _('1 month')),
289 289 ]
290 290 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
291 291 c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
292 292 show_expired=show_expired)
293 293 defaults = c.user.get_dict()
294 294 return htmlfill.render(
295 295 render('admin/users/user_edit.html'),
296 296 defaults=defaults,
297 297 encoding="UTF-8",
298 298 force_defaults=False)
299 299
300 300 def add_api_key(self, id):
301 301 c.user = self._get_user_or_raise_if_default(id)
302 302
303 303 lifetime = safe_int(request.POST.get('lifetime'), -1)
304 304 description = request.POST.get('description')
305 305 ApiKeyModel().create(c.user.user_id, description, lifetime)
306 306 Session().commit()
307 307 h.flash(_("API key successfully created"), category='success')
308 308 return redirect(url('edit_user_api_keys', id=c.user.user_id))
309 309
310 310 def delete_api_key(self, id):
311 311 c.user = self._get_user_or_raise_if_default(id)
312 312
313 313 api_key = request.POST.get('del_api_key')
314 314 if request.POST.get('del_api_key_builtin'):
315 315 user = User.get(c.user.user_id)
316 316 if user:
317 317 user.api_key = generate_api_key(user.username)
318 318 Session().add(user)
319 319 Session().commit()
320 320 h.flash(_("API key successfully reset"), category='success')
321 321 elif api_key:
322 322 ApiKeyModel().delete(api_key, c.user.user_id)
323 323 Session().commit()
324 324 h.flash(_("API key successfully deleted"), category='success')
325 325
326 326 return redirect(url('edit_user_api_keys', id=c.user.user_id))
327 327
328 328 def update_account(self, id):
329 329 pass
330 330
331 331 def edit_perms(self, id):
332 332 c.user = self._get_user_or_raise_if_default(id)
333 333 c.active = 'perms'
334 334 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
335 335
336 336 umodel = UserModel()
337 337 defaults = c.user.get_dict()
338 338 defaults.update({
339 339 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
340 340 'create_user_group_perm': umodel.has_perm(c.user,
341 341 'hg.usergroup.create.true'),
342 342 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
343 343 })
344 344 return htmlfill.render(
345 345 render('admin/users/user_edit.html'),
346 346 defaults=defaults,
347 347 encoding="UTF-8",
348 348 force_defaults=False)
349 349
350 350 def update_perms(self, id):
351 351 """PUT /users_perm/id: Update an existing item"""
352 352 # url('user_perm', id=ID, method='put')
353 user = User.get_or_404(id)
353 user = self._get_user_or_raise_if_default(id)
354 354
355 355 try:
356 356 form = CustomDefaultPermissionsForm()()
357 357 form_result = form.to_python(request.POST)
358 358
359 359 inherit_perms = form_result['inherit_default_permissions']
360 360 user.inherit_default_permissions = inherit_perms
361 361 Session().add(user)
362 362 user_model = UserModel()
363 363
364 364 defs = UserToPerm.query()\
365 365 .filter(UserToPerm.user == user)\
366 366 .all()
367 367 for ug in defs:
368 368 Session().delete(ug)
369 369
370 370 if form_result['create_repo_perm']:
371 371 user_model.grant_perm(id, 'hg.create.repository')
372 372 else:
373 373 user_model.grant_perm(id, 'hg.create.none')
374 374 if form_result['create_user_group_perm']:
375 375 user_model.grant_perm(id, 'hg.usergroup.create.true')
376 376 else:
377 377 user_model.grant_perm(id, 'hg.usergroup.create.false')
378 378 if form_result['fork_repo_perm']:
379 379 user_model.grant_perm(id, 'hg.fork.repository')
380 380 else:
381 381 user_model.grant_perm(id, 'hg.fork.none')
382 382 h.flash(_("Updated permissions"), category='success')
383 383 Session().commit()
384 384 except Exception:
385 385 log.error(traceback.format_exc())
386 386 h.flash(_('An error occurred during permissions saving'),
387 387 category='error')
388 388 return redirect(url('edit_user_perms', id=id))
389 389
390 390 def edit_emails(self, id):
391 391 c.user = self._get_user_or_raise_if_default(id)
392 392 c.active = 'emails'
393 393 c.user_email_map = UserEmailMap.query()\
394 394 .filter(UserEmailMap.user == c.user).all()
395 395
396 396 defaults = c.user.get_dict()
397 397 return htmlfill.render(
398 398 render('admin/users/user_edit.html'),
399 399 defaults=defaults,
400 400 encoding="UTF-8",
401 401 force_defaults=False)
402 402
403 403 def add_email(self, id):
404 404 """POST /user_emails:Add an existing item"""
405 405 # url('user_emails', id=ID, method='put')
406
406 user = self._get_user_or_raise_if_default(id)
407 407 email = request.POST.get('new_email')
408 408 user_model = UserModel()
409 409
410 410 try:
411 411 user_model.add_extra_email(id, email)
412 412 Session().commit()
413 413 h.flash(_("Added email %s to user") % email, category='success')
414 414 except formencode.Invalid, error:
415 415 msg = error.error_dict['email']
416 416 h.flash(msg, category='error')
417 417 except Exception:
418 418 log.error(traceback.format_exc())
419 419 h.flash(_('An error occurred during email saving'),
420 420 category='error')
421 421 return redirect(url('edit_user_emails', id=id))
422 422
423 423 def delete_email(self, id):
424 424 """DELETE /user_emails_delete/id: Delete an existing item"""
425 425 # url('user_emails_delete', id=ID, method='delete')
426 user = self._get_user_or_raise_if_default(id)
426 427 email_id = request.POST.get('del_email_id')
427 428 user_model = UserModel()
428 429 user_model.delete_extra_email(id, email_id)
429 430 Session().commit()
430 431 h.flash(_("Removed email from user"), category='success')
431 432 return redirect(url('edit_user_emails', id=id))
432 433
433 434 def edit_ips(self, id):
434 435 c.user = self._get_user_or_raise_if_default(id)
435 436 c.active = 'ips'
436 437 c.user_ip_map = UserIpMap.query()\
437 438 .filter(UserIpMap.user == c.user).all()
438 439
439 440 c.inherit_default_ips = c.user.inherit_default_permissions
440 441 c.default_user_ip_map = UserIpMap.query()\
441 442 .filter(UserIpMap.user == User.get_default_user()).all()
442 443
443 444 defaults = c.user.get_dict()
444 445 return htmlfill.render(
445 446 render('admin/users/user_edit.html'),
446 447 defaults=defaults,
447 448 encoding="UTF-8",
448 449 force_defaults=False)
449 450
450 451 def add_ip(self, id):
451 452 """POST /user_ips:Add an existing item"""
452 453 # url('user_ips', id=ID, method='put')
453 454
454 455 ip = request.POST.get('new_ip')
455 456 user_model = UserModel()
456 457
457 458 try:
458 459 user_model.add_extra_ip(id, ip)
459 460 Session().commit()
460 461 h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
461 462 except formencode.Invalid, error:
462 463 msg = error.error_dict['ip']
463 464 h.flash(msg, category='error')
464 465 except Exception:
465 466 log.error(traceback.format_exc())
466 467 h.flash(_('An error occurred during ip saving'),
467 468 category='error')
468 469
469 470 if 'default_user' in request.POST:
470 471 return redirect(url('admin_permissions_ips'))
471 472 return redirect(url('edit_user_ips', id=id))
472 473
473 474 def delete_ip(self, id):
474 475 """DELETE /user_ips_delete/id: Delete an existing item"""
475 476 # url('user_ips_delete', id=ID, method='delete')
476 477 ip_id = request.POST.get('del_ip_id')
477 478 user_model = UserModel()
478 479 user_model.delete_extra_ip(id, ip_id)
479 480 Session().commit()
480 481 h.flash(_("Removed IP address from user whitelist"), category='success')
481 482
482 483 if 'default_user' in request.POST:
483 484 return redirect(url('admin_permissions_ips'))
484 485 return redirect(url('edit_user_ips', id=id))
@@ -1,578 +1,596 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14
15 15 from sqlalchemy.orm.exc import NoResultFound
16 16 from webob.exc import HTTPNotFound
17 17
18 18 from kallithea.tests import *
19 19 from kallithea.tests.fixture import Fixture
20 20 from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
21 21 from kallithea.lib.auth import check_password
22 22 from kallithea.model.user import UserModel
23 23 from kallithea.model import validators
24 24 from kallithea.lib import helpers as h
25 25 from kallithea.model.meta import Session
26 26
27 27 fixture = Fixture()
28 28
29 29
30 30 class TestAdminUsersController(TestController):
31 31 test_user_1 = 'testme'
32 32
33 33 @classmethod
34 34 def teardown_class(cls):
35 35 if User.get_by_username(cls.test_user_1):
36 36 UserModel().delete(cls.test_user_1)
37 37 Session().commit()
38 38
39 39 def test_index(self):
40 40 self.log_user()
41 41 response = self.app.get(url('users'))
42 42 # Test response...
43 43
44 44 def test_create(self):
45 45 self.log_user()
46 46 username = 'newtestuser'
47 47 password = 'test12'
48 48 password_confirmation = password
49 49 name = 'name'
50 50 lastname = 'lastname'
51 51 email = 'mail@mail.com'
52 52
53 53 response = self.app.post(url('users'),
54 54 {'username': username,
55 55 'password': password,
56 56 'password_confirmation': password_confirmation,
57 57 'firstname': name,
58 58 'active': True,
59 59 'lastname': lastname,
60 60 'extern_name': 'internal',
61 61 'extern_type': 'internal',
62 62 'email': email,
63 63 '_authentication_token': self.authentication_token()})
64 64
65 65 self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''')
66 66 self.checkSessionFlash(response, '''/edit">%s</a>''' % (username))
67 67
68 68 new_user = Session().query(User).\
69 69 filter(User.username == username).one()
70 70
71 71 self.assertEqual(new_user.username, username)
72 72 self.assertEqual(check_password(password, new_user.password), True)
73 73 self.assertEqual(new_user.name, name)
74 74 self.assertEqual(new_user.lastname, lastname)
75 75 self.assertEqual(new_user.email, email)
76 76
77 77 response.follow()
78 78 response = response.follow()
79 79 response.mustcontain("""newtestuser""")
80 80
81 81 def test_create_err(self):
82 82 self.log_user()
83 83 username = 'new_user'
84 84 password = ''
85 85 name = 'name'
86 86 lastname = 'lastname'
87 87 email = 'errmail.com'
88 88
89 89 response = self.app.post(url('users'), {'username': username,
90 90 'password': password,
91 91 'name': name,
92 92 'active': False,
93 93 'lastname': lastname,
94 94 'email': email,
95 95 '_authentication_token': self.authentication_token()})
96 96
97 97 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
98 98 msg = h.html_escape(msg % {'username': 'new_user'})
99 99 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
100 100 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
101 101 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
102 102
103 103 def get_user():
104 104 Session().query(User).filter(User.username == username).one()
105 105
106 106 self.assertRaises(NoResultFound, get_user), 'found user in database'
107 107
108 108 def test_new(self):
109 109 self.log_user()
110 110 response = self.app.get(url('new_user'))
111 111
112 112 @parameterized.expand(
113 113 [('firstname', {'firstname': 'new_username'}),
114 114 ('lastname', {'lastname': 'new_username'}),
115 115 ('admin', {'admin': True}),
116 116 ('admin', {'admin': False}),
117 117 ('extern_type', {'extern_type': 'ldap'}),
118 118 ('extern_type', {'extern_type': None}),
119 119 ('extern_name', {'extern_name': 'test'}),
120 120 ('extern_name', {'extern_name': None}),
121 121 ('active', {'active': False}),
122 122 ('active', {'active': True}),
123 123 ('email', {'email': 'some@email.com'}),
124 124 # ('new_password', {'new_password': 'foobar123',
125 125 # 'password_confirmation': 'foobar123'})
126 126 ])
127 127 def test_update(self, name, attrs):
128 128 self.log_user()
129 129 usr = fixture.create_user(self.test_user_1, password='qweqwe',
130 130 email='testme@example.com',
131 131 extern_type='internal',
132 132 extern_name=self.test_user_1,
133 133 skip_if_exists=True)
134 134 Session().commit()
135 135 params = usr.get_api_data(True)
136 136 params.update({'password_confirmation': ''})
137 137 params.update({'new_password': ''})
138 138 params.update(attrs)
139 139 if name == 'email':
140 140 params['emails'] = [attrs['email']]
141 141 if name == 'extern_type':
142 142 #cannot update this via form, expected value is original one
143 143 params['extern_type'] = "internal"
144 144 if name == 'extern_name':
145 145 #cannot update this via form, expected value is original one
146 146 params['extern_name'] = self.test_user_1
147 147 # special case since this user is not
148 148 # logged in yet his data is not filled
149 149 # so we use creation data
150 150
151 151 params.update({'_authentication_token': self.authentication_token()})
152 152 response = self.app.put(url('user', id=usr.user_id), params)
153 153 self.checkSessionFlash(response, 'User updated successfully')
154 154 params.pop('_authentication_token')
155 155
156 156 updated_user = User.get_by_username(self.test_user_1)
157 157 updated_params = updated_user.get_api_data(True)
158 158 updated_params.update({'password_confirmation': ''})
159 159 updated_params.update({'new_password': ''})
160 160
161 161 self.assertEqual(params, updated_params)
162 162
163 163 def test_delete(self):
164 164 self.log_user()
165 165 username = 'newtestuserdeleteme'
166 166
167 167 fixture.create_user(name=username)
168 168
169 169 new_user = Session().query(User)\
170 170 .filter(User.username == username).one()
171 171 response = self.app.delete(url('user', id=new_user.user_id))
172 172
173 173 self.checkSessionFlash(response, 'Successfully deleted user')
174 174
175 175 def test_delete_repo_err(self):
176 176 self.log_user()
177 177 username = 'repoerr'
178 178 reponame = 'repoerr_fail'
179 179
180 180 fixture.create_user(name=username)
181 181 fixture.create_repo(name=reponame, cur_user=username)
182 182
183 183 new_user = Session().query(User)\
184 184 .filter(User.username == username).one()
185 185 response = self.app.delete(url('user', id=new_user.user_id))
186 186 self.checkSessionFlash(response, 'User "%s" still '
187 187 'owns 1 repositories and cannot be removed. '
188 188 'Switch owners or remove those repositories: '
189 189 '%s' % (username, reponame))
190 190
191 191 response = self.app.delete(url('repo', repo_name=reponame))
192 192 self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
193 193
194 194 response = self.app.delete(url('user', id=new_user.user_id))
195 195 self.checkSessionFlash(response, 'Successfully deleted user')
196 196
197 197 def test_delete_repo_group_err(self):
198 198 self.log_user()
199 199 username = 'repogrouperr'
200 200 groupname = 'repogroup_fail'
201 201
202 202 fixture.create_user(name=username)
203 203 fixture.create_repo_group(name=groupname, cur_user=username)
204 204
205 205 new_user = Session().query(User)\
206 206 .filter(User.username == username).one()
207 207 response = self.app.delete(url('user', id=new_user.user_id))
208 208 self.checkSessionFlash(response, 'User "%s" still '
209 209 'owns 1 repository groups and cannot be removed. '
210 210 'Switch owners or remove those repository groups: '
211 211 '%s' % (username, groupname))
212 212
213 213 # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
214 214 # rg = RepoGroup.get_by_group_name(group_name=groupname)
215 215 # response = self.app.get(url('repos_groups', id=rg.group_id))
216 216
217 217 response = self.app.delete(url('delete_repo_group', group_name=groupname))
218 218 self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
219 219
220 220 response = self.app.delete(url('user', id=new_user.user_id))
221 221 self.checkSessionFlash(response, 'Successfully deleted user')
222 222
223 223 def test_delete_user_group_err(self):
224 224 self.log_user()
225 225 username = 'usergrouperr'
226 226 groupname = 'usergroup_fail'
227 227
228 228 fixture.create_user(name=username)
229 229 ug = fixture.create_user_group(name=groupname, cur_user=username)
230 230
231 231 new_user = Session().query(User)\
232 232 .filter(User.username == username).one()
233 233 response = self.app.delete(url('user', id=new_user.user_id))
234 234 self.checkSessionFlash(response, 'User "%s" still '
235 235 'owns 1 user groups and cannot be removed. '
236 236 'Switch owners or remove those user groups: '
237 237 '%s' % (username, groupname))
238 238
239 239 # TODO: why do this fail?
240 240 #response = self.app.delete(url('delete_users_group', id=groupname))
241 241 #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
242 242
243 243 fixture.destroy_user_group(ug.users_group_id)
244 244
245 245 response = self.app.delete(url('user', id=new_user.user_id))
246 246 self.checkSessionFlash(response, 'Successfully deleted user')
247 247
248 248 def test_show(self):
249 249 response = self.app.get(url('user', id=1))
250 250
251 251 def test_edit(self):
252 252 self.log_user()
253 253 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
254 254 response = self.app.get(url('edit_user', id=user.user_id))
255 255
256 256 def test_add_perm_create_repo(self):
257 257 self.log_user()
258 258 perm_none = Permission.get_by_key('hg.create.none')
259 259 perm_create = Permission.get_by_key('hg.create.repository')
260 260
261 261 user = UserModel().create_or_update(username='dummy', password='qwe',
262 262 email='dummy', firstname='a',
263 263 lastname='b')
264 264 Session().commit()
265 265 uid = user.user_id
266 266
267 267 try:
268 268 #User should have None permission on creation repository
269 269 self.assertEqual(UserModel().has_perm(user, perm_none), False)
270 270 self.assertEqual(UserModel().has_perm(user, perm_create), False)
271 271
272 272 response = self.app.post(url('edit_user_perms', id=uid),
273 273 params=dict(_method='put',
274 274 create_repo_perm=True,
275 275 _authentication_token=self.authentication_token()))
276 276
277 277 perm_none = Permission.get_by_key('hg.create.none')
278 278 perm_create = Permission.get_by_key('hg.create.repository')
279 279
280 280 #User should have None permission on creation repository
281 281 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
282 282 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
283 283 finally:
284 284 UserModel().delete(uid)
285 285 Session().commit()
286 286
287 287 def test_revoke_perm_create_repo(self):
288 288 self.log_user()
289 289 perm_none = Permission.get_by_key('hg.create.none')
290 290 perm_create = Permission.get_by_key('hg.create.repository')
291 291
292 292 user = UserModel().create_or_update(username='dummy', password='qwe',
293 293 email='dummy', firstname='a',
294 294 lastname='b')
295 295 Session().commit()
296 296 uid = user.user_id
297 297
298 298 try:
299 299 #User should have None permission on creation repository
300 300 self.assertEqual(UserModel().has_perm(user, perm_none), False)
301 301 self.assertEqual(UserModel().has_perm(user, perm_create), False)
302 302
303 303 response = self.app.post(url('edit_user_perms', id=uid),
304 304 params=dict(_method='put', _authentication_token=self.authentication_token()))
305 305
306 306 perm_none = Permission.get_by_key('hg.create.none')
307 307 perm_create = Permission.get_by_key('hg.create.repository')
308 308
309 309 #User should have None permission on creation repository
310 310 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
311 311 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
312 312 finally:
313 313 UserModel().delete(uid)
314 314 Session().commit()
315 315
316 316 def test_add_perm_fork_repo(self):
317 317 self.log_user()
318 318 perm_none = Permission.get_by_key('hg.fork.none')
319 319 perm_fork = Permission.get_by_key('hg.fork.repository')
320 320
321 321 user = UserModel().create_or_update(username='dummy', password='qwe',
322 322 email='dummy', firstname='a',
323 323 lastname='b')
324 324 Session().commit()
325 325 uid = user.user_id
326 326
327 327 try:
328 328 #User should have None permission on creation repository
329 329 self.assertEqual(UserModel().has_perm(user, perm_none), False)
330 330 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
331 331
332 332 response = self.app.post(url('edit_user_perms', id=uid),
333 333 params=dict(_method='put',
334 334 create_repo_perm=True,
335 335 _authentication_token=self.authentication_token()))
336 336
337 337 perm_none = Permission.get_by_key('hg.create.none')
338 338 perm_create = Permission.get_by_key('hg.create.repository')
339 339
340 340 #User should have None permission on creation repository
341 341 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
342 342 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
343 343 finally:
344 344 UserModel().delete(uid)
345 345 Session().commit()
346 346
347 347 def test_revoke_perm_fork_repo(self):
348 348 self.log_user()
349 349 perm_none = Permission.get_by_key('hg.fork.none')
350 350 perm_fork = Permission.get_by_key('hg.fork.repository')
351 351
352 352 user = UserModel().create_or_update(username='dummy', password='qwe',
353 353 email='dummy', firstname='a',
354 354 lastname='b')
355 355 Session().commit()
356 356 uid = user.user_id
357 357
358 358 try:
359 359 #User should have None permission on creation repository
360 360 self.assertEqual(UserModel().has_perm(user, perm_none), False)
361 361 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
362 362
363 363 response = self.app.post(url('edit_user_perms', id=uid),
364 364 params=dict(_method='put', _authentication_token=self.authentication_token()))
365 365
366 366 perm_none = Permission.get_by_key('hg.create.none')
367 367 perm_create = Permission.get_by_key('hg.create.repository')
368 368
369 369 #User should have None permission on creation repository
370 370 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
371 371 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
372 372 finally:
373 373 UserModel().delete(uid)
374 374 Session().commit()
375 375
376 376 def test_ips(self):
377 377 self.log_user()
378 378 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
379 379 response = self.app.get(url('edit_user_ips', id=user.user_id))
380 380 response.mustcontain('All IP addresses are allowed')
381 381
382 382 @parameterized.expand([
383 383 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
384 384 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
385 385 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
386 386 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
387 387 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
388 388 ('127_bad_ip', 'foobar', 'foobar', True),
389 389 ])
390 390 def test_add_ip(self, test_name, ip, ip_range, failure):
391 391 self.log_user()
392 392 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
393 393 user_id = user.user_id
394 394
395 395 response = self.app.put(url('edit_user_ips', id=user_id),
396 396 params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
397 397
398 398 if failure:
399 399 self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
400 400 response = self.app.get(url('edit_user_ips', id=user_id))
401 401 response.mustcontain(no=[ip])
402 402 response.mustcontain(no=[ip_range])
403 403
404 404 else:
405 405 response = self.app.get(url('edit_user_ips', id=user_id))
406 406 response.mustcontain(ip)
407 407 response.mustcontain(ip_range)
408 408
409 409 ## cleanup
410 410 for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all():
411 411 Session().delete(del_ip)
412 412 Session().commit()
413 413
414 414 def test_delete_ip(self):
415 415 self.log_user()
416 416 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
417 417 user_id = user.user_id
418 418 ip = '127.0.0.1/32'
419 419 ip_range = '127.0.0.1 - 127.0.0.1'
420 420 new_ip = UserModel().add_extra_ip(user_id, ip)
421 421 Session().commit()
422 422 new_ip_id = new_ip.ip_id
423 423
424 424 response = self.app.get(url('edit_user_ips', id=user_id))
425 425 response.mustcontain(ip)
426 426 response.mustcontain(ip_range)
427 427
428 428 self.app.post(url('edit_user_ips', id=user_id),
429 429 params=dict(_method='delete', del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
430 430
431 431 response = self.app.get(url('edit_user_ips', id=user_id))
432 432 response.mustcontain('All IP addresses are allowed')
433 433 response.mustcontain(no=[ip])
434 434 response.mustcontain(no=[ip_range])
435 435
436 436 def test_api_keys(self):
437 437 self.log_user()
438 438
439 439 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
440 440 response = self.app.get(url('edit_user_api_keys', id=user.user_id))
441 441 response.mustcontain(user.api_key)
442 442 response.mustcontain('Expires: Never')
443 443
444 444 @parameterized.expand([
445 445 ('forever', -1),
446 446 ('5mins', 60*5),
447 447 ('30days', 60*60*24*30),
448 448 ])
449 449 def test_add_api_keys(self, desc, lifetime):
450 450 self.log_user()
451 451 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
452 452 user_id = user.user_id
453 453
454 454 response = self.app.post(url('edit_user_api_keys', id=user_id),
455 455 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
456 456 self.checkSessionFlash(response, 'API key successfully created')
457 457 try:
458 458 response = response.follow()
459 459 user = User.get(user_id)
460 460 for api_key in user.api_keys:
461 461 response.mustcontain(api_key)
462 462 finally:
463 463 for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
464 464 Session().delete(api_key)
465 465 Session().commit()
466 466
467 467 def test_remove_api_key(self):
468 468 self.log_user()
469 469 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
470 470 user_id = user.user_id
471 471
472 472 response = self.app.post(url('edit_user_api_keys', id=user_id),
473 473 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
474 474 self.checkSessionFlash(response, 'API key successfully created')
475 475 response = response.follow()
476 476
477 477 #now delete our key
478 478 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
479 479 self.assertEqual(1, len(keys))
480 480
481 481 response = self.app.post(url('edit_user_api_keys', id=user_id),
482 482 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
483 483 self.checkSessionFlash(response, 'API key successfully deleted')
484 484 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
485 485 self.assertEqual(0, len(keys))
486 486
487 487 def test_reset_main_api_key(self):
488 488 self.log_user()
489 489 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
490 490 user_id = user.user_id
491 491 api_key = user.api_key
492 492 response = self.app.get(url('edit_user_api_keys', id=user_id))
493 493 response.mustcontain(api_key)
494 494 response.mustcontain('Expires: Never')
495 495
496 496 response = self.app.post(url('edit_user_api_keys', id=user_id),
497 497 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
498 498 self.checkSessionFlash(response, 'API key successfully reset')
499 499 response = response.follow()
500 500 response.mustcontain(no=[api_key])
501 501
502 502 # TODO To be uncommented when pytest is the test runner
503 503 #import pytest
504 504 #from kallithea.controllers.admin.users import UsersController
505 505 #class TestAdminUsersController_unittest(object):
506 506 # """
507 507 # Unit tests for the users controller
508 508 # These are in a separate class, not deriving from TestController (and thus
509 509 # unittest.TestCase), to be able to benefit from pytest features like
510 510 # monkeypatch.
511 511 # """
512 512 # def test_get_user_or_raise_if_default(self, monkeypatch):
513 513 # # flash complains about an unexisting session
514 514 # def flash_mock(*args, **kwargs):
515 515 # pass
516 516 # monkeypatch.setattr(h, 'flash', flash_mock)
517 517 #
518 518 # u = UsersController()
519 519 # # a regular user should work correctly
520 520 # user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
521 521 # assert u._get_user_or_raise_if_default(user.user_id) == user
522 522 # # the default user should raise
523 523 # with pytest.raises(HTTPNotFound):
524 524 # u._get_user_or_raise_if_default(User.get_default_user().user_id)
525 525
526 526
527 527 class TestAdminUsersControllerForDefaultUser(TestController):
528 528 """
529 529 Edit actions on the default user are not allowed.
530 530 Validate that they throw a 404 exception.
531 531 """
532 532 def test_edit_default_user(self):
533 533 self.log_user()
534 534 user = User.get_default_user()
535 535 response = self.app.get(url('edit_user', id=user.user_id), status=404)
536 536
537 537 def test_edit_advanced_default_user(self):
538 538 self.log_user()
539 539 user = User.get_default_user()
540 540 response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
541 541
542 542 # API keys
543 543 def test_edit_api_keys_default_user(self):
544 544 self.log_user()
545 545 user = User.get_default_user()
546 546 response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
547 547
548 548 def test_add_api_keys_default_user(self):
549 549 self.log_user()
550 550 user = User.get_default_user()
551 551 response = self.app.post(url('edit_user_api_keys', id=user.user_id),
552 552 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
553 553
554 554 def test_delete_api_keys_default_user(self):
555 555 self.log_user()
556 556 user = User.get_default_user()
557 557 response = self.app.post(url('edit_user_api_keys', id=user.user_id),
558 558 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
559 559
560 560 # Permissions
561 561 def test_edit_perms_default_user(self):
562 562 self.log_user()
563 563 user = User.get_default_user()
564 564 response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
565 565
566 def test_update_perms_default_user(self):
567 self.log_user()
568 user = User.get_default_user()
569 response = self.app.post(url('edit_user_perms', id=user.user_id),
570 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
571
566 572 # E-mails
567 573 def test_edit_emails_default_user(self):
568 574 self.log_user()
569 575 user = User.get_default_user()
570 576 response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
571 577
578 def test_add_emails_default_user(self):
579 self.log_user()
580 user = User.get_default_user()
581 response = self.app.post(url('edit_user_emails', id=user.user_id),
582 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
583
584 def test_delete_emails_default_user(self):
585 self.log_user()
586 user = User.get_default_user()
587 response = self.app.post(url('edit_user_emails', id=user.user_id),
588 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
589
572 590 # IP addresses
573 591 # Add/delete of IP addresses for the default user is used to maintain
574 592 # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
575 593 def test_edit_ip_default_user(self):
576 594 self.log_user()
577 595 user = User.get_default_user()
578 596 response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
General Comments 0
You need to be logged in to leave comments. Login now