##// END OF EJS Templates
admin: don't redirect back to the list of users after creating a user...
Mads Kiilerich -
r6118:9b80c2a6 default
parent child Browse files
Show More
@@ -1,446 +1,445 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.controllers.admin.users
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 Users crud controller for pylons
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Apr 4, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import logging
29 29 import traceback
30 30 import formencode
31 31
32 32 from formencode import htmlfill
33 33 from pylons import request, tmpl_context as c, url, config
34 34 from pylons.i18n.translation import _
35 35 from sqlalchemy.sql.expression import func
36 36 from webob.exc import HTTPFound, HTTPNotFound
37 37
38 38 import kallithea
39 39 from kallithea.lib.exceptions import DefaultUserException, \
40 40 UserOwnsReposException, UserCreationError
41 41 from kallithea.lib import helpers as h
42 42 from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator, \
43 43 AuthUser
44 44 from kallithea.lib import auth_modules
45 45 from kallithea.lib.auth_modules import auth_internal
46 46 from kallithea.lib.base import BaseController, render
47 47 from kallithea.model.api_key import ApiKeyModel
48 48
49 49 from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
50 50 from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
51 51 from kallithea.model.user import UserModel
52 52 from kallithea.model.meta import Session
53 53 from kallithea.lib.utils import action_logger
54 54 from kallithea.lib.compat import json
55 55 from kallithea.lib.utils2 import datetime_to_time, safe_int, generate_api_key
56 56
57 57 log = logging.getLogger(__name__)
58 58
59 59
60 60 class UsersController(BaseController):
61 61 """REST Controller styled on the Atom Publishing Protocol"""
62 62
63 63 @LoginRequired()
64 64 @HasPermissionAnyDecorator('hg.admin')
65 65 def __before__(self):
66 66 super(UsersController, self).__before__()
67 67 c.available_permissions = config['available_permissions']
68 68 c.EXTERN_TYPE_INTERNAL = kallithea.EXTERN_TYPE_INTERNAL
69 69
70 70 def index(self, format='html'):
71 71 c.users_list = User.query().order_by(User.username) \
72 72 .filter(User.username != User.DEFAULT_USER) \
73 73 .order_by(func.lower(User.username)) \
74 74 .all()
75 75
76 76 users_data = []
77 77 total_records = len(c.users_list)
78 78 _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
79 79 template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
80 80
81 81 grav_tmpl = '<div class="gravatar">%s</div>'
82 82
83 83 username = lambda user_id, username: (
84 84 template.get_def("user_name")
85 85 .render(user_id, username, _=_, h=h, c=c))
86 86
87 87 user_actions = lambda user_id, username: (
88 88 template.get_def("user_actions")
89 89 .render(user_id, username, _=_, h=h, c=c))
90 90
91 91 for user in c.users_list:
92 92 users_data.append({
93 93 "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
94 94 "raw_name": user.username,
95 95 "username": username(user.user_id, user.username),
96 96 "firstname": h.escape(user.name),
97 97 "lastname": h.escape(user.lastname),
98 98 "last_login": h.fmt_date(user.last_login),
99 99 "last_login_raw": datetime_to_time(user.last_login),
100 100 "active": h.boolicon(user.active),
101 101 "admin": h.boolicon(user.admin),
102 102 "extern_type": user.extern_type,
103 103 "extern_name": user.extern_name,
104 104 "action": user_actions(user.user_id, user.username),
105 105 })
106 106
107 107 c.data = json.dumps({
108 108 "totalRecords": total_records,
109 109 "startIndex": 0,
110 110 "sort": None,
111 111 "dir": "asc",
112 112 "records": users_data
113 113 })
114 114
115 115 return render('admin/users/users.html')
116 116
117 117 def create(self):
118 118 c.default_extern_type = auth_internal.KallitheaAuthPlugin.name
119 119 c.default_extern_name = auth_internal.KallitheaAuthPlugin.name
120 120 user_model = UserModel()
121 121 user_form = UserForm()()
122 122 try:
123 123 form_result = user_form.to_python(dict(request.POST))
124 124 user = user_model.create(form_result)
125 usr = form_result['username']
126 action_logger(self.authuser, 'admin_created_user:%s' % usr,
125 action_logger(self.authuser, 'admin_created_user:%s' % user.username,
127 126 None, self.ip_addr, self.sa)
128 h.flash(h.literal(_('Created user %s') % h.link_to(h.escape(usr), url('edit_user', id=user.user_id))),
127 h.flash(_('Created user %s') % user.username,
129 128 category='success')
130 129 Session().commit()
131 130 except formencode.Invalid as errors:
132 131 return htmlfill.render(
133 132 render('admin/users/user_add.html'),
134 133 defaults=errors.value,
135 134 errors=errors.error_dict or {},
136 135 prefix_error=False,
137 136 encoding="UTF-8",
138 137 force_defaults=False)
139 138 except UserCreationError as e:
140 139 h.flash(e, 'error')
141 140 except Exception:
142 141 log.error(traceback.format_exc())
143 142 h.flash(_('Error occurred during creation of user %s') \
144 143 % request.POST.get('username'), category='error')
145 raise HTTPFound(location=url('users'))
144 raise HTTPFound(location=url('edit_user', id=user.user_id))
146 145
147 146 def new(self, format='html'):
148 147 c.default_extern_type = auth_internal.KallitheaAuthPlugin.name
149 148 c.default_extern_name = auth_internal.KallitheaAuthPlugin.name
150 149 return render('admin/users/user_add.html')
151 150
152 151 def update(self, id):
153 152 user_model = UserModel()
154 153 user = user_model.get(id)
155 154 _form = UserForm(edit=True, old_data={'user_id': id,
156 155 'email': user.email})()
157 156 form_result = {}
158 157 try:
159 158 form_result = _form.to_python(dict(request.POST))
160 159 skip_attrs = ['extern_type', 'extern_name',
161 160 ] + auth_modules.get_managed_fields(user)
162 161
163 162 user_model.update(id, form_result, skip_attrs=skip_attrs)
164 163 usr = form_result['username']
165 164 action_logger(self.authuser, 'admin_updated_user:%s' % usr,
166 165 None, self.ip_addr, self.sa)
167 166 h.flash(_('User updated successfully'), category='success')
168 167 Session().commit()
169 168 except formencode.Invalid as errors:
170 169 defaults = errors.value
171 170 e = errors.error_dict or {}
172 171 defaults.update({
173 172 'create_repo_perm': user_model.has_perm(id,
174 173 'hg.create.repository'),
175 174 'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'),
176 175 })
177 176 return htmlfill.render(
178 177 self._render_edit_profile(user),
179 178 defaults=defaults,
180 179 errors=e,
181 180 prefix_error=False,
182 181 encoding="UTF-8",
183 182 force_defaults=False)
184 183 except Exception:
185 184 log.error(traceback.format_exc())
186 185 h.flash(_('Error occurred during update of user %s') \
187 186 % form_result.get('username'), category='error')
188 187 raise HTTPFound(location=url('edit_user', id=id))
189 188
190 189 def delete(self, id):
191 190 usr = User.get_or_404(id)
192 191 try:
193 192 UserModel().delete(usr)
194 193 Session().commit()
195 194 h.flash(_('Successfully deleted user'), category='success')
196 195 except (UserOwnsReposException, DefaultUserException) as e:
197 196 h.flash(e, category='warning')
198 197 except Exception:
199 198 log.error(traceback.format_exc())
200 199 h.flash(_('An error occurred during deletion of user'),
201 200 category='error')
202 201 raise HTTPFound(location=url('users'))
203 202
204 203 def _get_user_or_raise_if_default(self, id):
205 204 try:
206 205 return User.get_or_404(id, allow_default=False)
207 206 except DefaultUserException:
208 207 h.flash(_("The default user cannot be edited"), category='warning')
209 208 raise HTTPNotFound
210 209
211 210 def _render_edit_profile(self, user):
212 211 c.user = user
213 212 c.active = 'profile'
214 213 c.perm_user = AuthUser(dbuser=user)
215 214 c.ip_addr = self.ip_addr
216 215 managed_fields = auth_modules.get_managed_fields(user)
217 216 c.readonly = lambda n: 'readonly' if n in managed_fields else None
218 217 return render('admin/users/user_edit.html')
219 218
220 219 def edit(self, id, format='html'):
221 220 user = self._get_user_or_raise_if_default(id)
222 221 defaults = user.get_dict()
223 222
224 223 return htmlfill.render(
225 224 self._render_edit_profile(user),
226 225 defaults=defaults,
227 226 encoding="UTF-8",
228 227 force_defaults=False)
229 228
230 229 def edit_advanced(self, id):
231 230 c.user = self._get_user_or_raise_if_default(id)
232 231 c.active = 'advanced'
233 232 c.perm_user = AuthUser(user_id=id)
234 233 c.ip_addr = self.ip_addr
235 234
236 235 umodel = UserModel()
237 236 defaults = c.user.get_dict()
238 237 defaults.update({
239 238 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
240 239 'create_user_group_perm': umodel.has_perm(c.user,
241 240 'hg.usergroup.create.true'),
242 241 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
243 242 })
244 243 return htmlfill.render(
245 244 render('admin/users/user_edit.html'),
246 245 defaults=defaults,
247 246 encoding="UTF-8",
248 247 force_defaults=False)
249 248
250 249 def edit_api_keys(self, id):
251 250 c.user = self._get_user_or_raise_if_default(id)
252 251 c.active = 'api_keys'
253 252 show_expired = True
254 253 c.lifetime_values = [
255 254 (str(-1), _('Forever')),
256 255 (str(5), _('5 minutes')),
257 256 (str(60), _('1 hour')),
258 257 (str(60 * 24), _('1 day')),
259 258 (str(60 * 24 * 30), _('1 month')),
260 259 ]
261 260 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
262 261 c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
263 262 show_expired=show_expired)
264 263 defaults = c.user.get_dict()
265 264 return htmlfill.render(
266 265 render('admin/users/user_edit.html'),
267 266 defaults=defaults,
268 267 encoding="UTF-8",
269 268 force_defaults=False)
270 269
271 270 def add_api_key(self, id):
272 271 c.user = self._get_user_or_raise_if_default(id)
273 272
274 273 lifetime = safe_int(request.POST.get('lifetime'), -1)
275 274 description = request.POST.get('description')
276 275 ApiKeyModel().create(c.user.user_id, description, lifetime)
277 276 Session().commit()
278 277 h.flash(_("API key successfully created"), category='success')
279 278 raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
280 279
281 280 def delete_api_key(self, id):
282 281 c.user = self._get_user_or_raise_if_default(id)
283 282
284 283 api_key = request.POST.get('del_api_key')
285 284 if request.POST.get('del_api_key_builtin'):
286 285 user = User.get(c.user.user_id)
287 286 if user is not None:
288 287 user.api_key = generate_api_key()
289 288 Session().add(user)
290 289 Session().commit()
291 290 h.flash(_("API key successfully reset"), category='success')
292 291 elif api_key:
293 292 ApiKeyModel().delete(api_key, c.user.user_id)
294 293 Session().commit()
295 294 h.flash(_("API key successfully deleted"), category='success')
296 295
297 296 raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
298 297
299 298 def update_account(self, id):
300 299 pass
301 300
302 301 def edit_perms(self, id):
303 302 c.user = self._get_user_or_raise_if_default(id)
304 303 c.active = 'perms'
305 304 c.perm_user = AuthUser(user_id=id)
306 305 c.ip_addr = self.ip_addr
307 306
308 307 umodel = UserModel()
309 308 defaults = c.user.get_dict()
310 309 defaults.update({
311 310 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
312 311 'create_user_group_perm': umodel.has_perm(c.user,
313 312 'hg.usergroup.create.true'),
314 313 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
315 314 })
316 315 return htmlfill.render(
317 316 render('admin/users/user_edit.html'),
318 317 defaults=defaults,
319 318 encoding="UTF-8",
320 319 force_defaults=False)
321 320
322 321 def update_perms(self, id):
323 322 user = self._get_user_or_raise_if_default(id)
324 323
325 324 try:
326 325 form = CustomDefaultPermissionsForm()()
327 326 form_result = form.to_python(request.POST)
328 327
329 328 inherit_perms = form_result['inherit_default_permissions']
330 329 user.inherit_default_permissions = inherit_perms
331 330 Session().add(user)
332 331 user_model = UserModel()
333 332
334 333 defs = UserToPerm.query() \
335 334 .filter(UserToPerm.user == user) \
336 335 .all()
337 336 for ug in defs:
338 337 Session().delete(ug)
339 338
340 339 if form_result['create_repo_perm']:
341 340 user_model.grant_perm(id, 'hg.create.repository')
342 341 else:
343 342 user_model.grant_perm(id, 'hg.create.none')
344 343 if form_result['create_user_group_perm']:
345 344 user_model.grant_perm(id, 'hg.usergroup.create.true')
346 345 else:
347 346 user_model.grant_perm(id, 'hg.usergroup.create.false')
348 347 if form_result['fork_repo_perm']:
349 348 user_model.grant_perm(id, 'hg.fork.repository')
350 349 else:
351 350 user_model.grant_perm(id, 'hg.fork.none')
352 351 h.flash(_("Updated permissions"), category='success')
353 352 Session().commit()
354 353 except Exception:
355 354 log.error(traceback.format_exc())
356 355 h.flash(_('An error occurred during permissions saving'),
357 356 category='error')
358 357 raise HTTPFound(location=url('edit_user_perms', id=id))
359 358
360 359 def edit_emails(self, id):
361 360 c.user = self._get_user_or_raise_if_default(id)
362 361 c.active = 'emails'
363 362 c.user_email_map = UserEmailMap.query() \
364 363 .filter(UserEmailMap.user == c.user).all()
365 364
366 365 defaults = c.user.get_dict()
367 366 return htmlfill.render(
368 367 render('admin/users/user_edit.html'),
369 368 defaults=defaults,
370 369 encoding="UTF-8",
371 370 force_defaults=False)
372 371
373 372 def add_email(self, id):
374 373 user = self._get_user_or_raise_if_default(id)
375 374 email = request.POST.get('new_email')
376 375 user_model = UserModel()
377 376
378 377 try:
379 378 user_model.add_extra_email(id, email)
380 379 Session().commit()
381 380 h.flash(_("Added email %s to user") % email, category='success')
382 381 except formencode.Invalid as error:
383 382 msg = error.error_dict['email']
384 383 h.flash(msg, category='error')
385 384 except Exception:
386 385 log.error(traceback.format_exc())
387 386 h.flash(_('An error occurred during email saving'),
388 387 category='error')
389 388 raise HTTPFound(location=url('edit_user_emails', id=id))
390 389
391 390 def delete_email(self, id):
392 391 user = self._get_user_or_raise_if_default(id)
393 392 email_id = request.POST.get('del_email_id')
394 393 user_model = UserModel()
395 394 user_model.delete_extra_email(id, email_id)
396 395 Session().commit()
397 396 h.flash(_("Removed email from user"), category='success')
398 397 raise HTTPFound(location=url('edit_user_emails', id=id))
399 398
400 399 def edit_ips(self, id):
401 400 c.user = self._get_user_or_raise_if_default(id)
402 401 c.active = 'ips'
403 402 c.user_ip_map = UserIpMap.query() \
404 403 .filter(UserIpMap.user == c.user).all()
405 404
406 405 c.inherit_default_ips = c.user.inherit_default_permissions
407 406 c.default_user_ip_map = UserIpMap.query() \
408 407 .filter(UserIpMap.user == User.get_default_user()).all()
409 408
410 409 defaults = c.user.get_dict()
411 410 return htmlfill.render(
412 411 render('admin/users/user_edit.html'),
413 412 defaults=defaults,
414 413 encoding="UTF-8",
415 414 force_defaults=False)
416 415
417 416 def add_ip(self, id):
418 417 ip = request.POST.get('new_ip')
419 418 user_model = UserModel()
420 419
421 420 try:
422 421 user_model.add_extra_ip(id, ip)
423 422 Session().commit()
424 423 h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
425 424 except formencode.Invalid as error:
426 425 msg = error.error_dict['ip']
427 426 h.flash(msg, category='error')
428 427 except Exception:
429 428 log.error(traceback.format_exc())
430 429 h.flash(_('An error occurred while adding IP address'),
431 430 category='error')
432 431
433 432 if 'default_user' in request.POST:
434 433 raise HTTPFound(location=url('admin_permissions_ips'))
435 434 raise HTTPFound(location=url('edit_user_ips', id=id))
436 435
437 436 def delete_ip(self, id):
438 437 ip_id = request.POST.get('del_ip_id')
439 438 user_model = UserModel()
440 439 user_model.delete_extra_ip(id, ip_id)
441 440 Session().commit()
442 441 h.flash(_("Removed IP address from user whitelist"), category='success')
443 442
444 443 if 'default_user' in request.POST:
445 444 raise HTTPFound(location=url('admin_permissions_ips'))
446 445 raise HTTPFound(location=url('edit_user_ips', id=id))
@@ -1,602 +1,602 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14
15 15 from sqlalchemy.orm.exc import NoResultFound, ObjectDeletedError
16 16
17 17 import pytest
18 18 from kallithea.tests import *
19 19 from kallithea.tests.fixture import Fixture
20 20 from kallithea.controllers.admin.users import UsersController
21 21 from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
22 22 from kallithea.lib.auth import check_password
23 23 from kallithea.model.user import UserModel
24 24 from kallithea.model import validators
25 25 from kallithea.lib import helpers as h
26 26 from kallithea.model.meta import Session
27 27 from webob.exc import HTTPNotFound
28 28
29 29 fixture = Fixture()
30 30
31 31 @pytest.yield_fixture
32 32 def user_and_repo_group_fail():
33 33 username = 'repogrouperr'
34 34 groupname = u'repogroup_fail'
35 35 user = fixture.create_user(name=username)
36 36 repo_group = fixture.create_repo_group(name=groupname, cur_user=username)
37 37 yield user, repo_group
38 38 # cleanup
39 39 try:
40 40 fixture.destroy_repo_group(repo_group)
41 41 except ObjectDeletedError:
42 42 # delete already succeeded in test body
43 43 pass
44 44
45 45 class TestAdminUsersController(TestController):
46 46 test_user_1 = 'testme'
47 47
48 48 @classmethod
49 49 def teardown_class(cls):
50 50 if User.get_by_username(cls.test_user_1):
51 51 UserModel().delete(cls.test_user_1)
52 52 Session().commit()
53 53
54 54 def test_index(self):
55 55 self.log_user()
56 56 response = self.app.get(url('users'))
57 57 # Test response...
58 58
59 59 def test_create(self):
60 60 self.log_user()
61 61 username = 'newtestuser'
62 62 password = 'test12'
63 63 password_confirmation = password
64 64 name = u'name'
65 65 lastname = u'lastname'
66 66 email = 'mail@example.com'
67 67
68 68 response = self.app.post(url('users'),
69 69 {'username': username,
70 70 'password': password,
71 71 'password_confirmation': password_confirmation,
72 72 'firstname': name,
73 73 'active': True,
74 74 'lastname': lastname,
75 75 'extern_name': 'internal',
76 76 'extern_type': 'internal',
77 77 'email': email,
78 78 '_authentication_token': self.authentication_token()})
79 # 302 Found
80 # The resource was found at http://localhost/_admin/users/5/edit; you should be redirected automatically.
79 81
80 self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''')
81 self.checkSessionFlash(response, '''/edit">%s</a>''' % (username))
82 self.checkSessionFlash(response, '''Created user %s''' % username)
83
84 response = response.follow()
85 response.mustcontain("""%s user settings""" % username) # in <title>
82 86
83 87 new_user = Session().query(User). \
84 88 filter(User.username == username).one()
85 89
86 90 assert new_user.username == username
87 91 assert check_password(password, new_user.password) == True
88 92 assert new_user.name == name
89 93 assert new_user.lastname == lastname
90 94 assert new_user.email == email
91 95
92 response.follow()
93 response = response.follow()
94 response.mustcontain("""newtestuser""")
95
96 96 def test_create_err(self):
97 97 self.log_user()
98 98 username = 'new_user'
99 99 password = ''
100 100 name = u'name'
101 101 lastname = u'lastname'
102 102 email = 'errmail.example.com'
103 103
104 104 response = self.app.post(url('users'), {'username': username,
105 105 'password': password,
106 106 'name': name,
107 107 'active': False,
108 108 'lastname': lastname,
109 109 'email': email,
110 110 '_authentication_token': self.authentication_token()})
111 111
112 112 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
113 113 msg = h.html_escape(msg % {'username': 'new_user'})
114 114 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
115 115 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
116 116 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
117 117
118 118 def get_user():
119 119 Session().query(User).filter(User.username == username).one()
120 120
121 121 with pytest.raises(NoResultFound):
122 122 get_user(), 'found user in database'
123 123
124 124 def test_new(self):
125 125 self.log_user()
126 126 response = self.app.get(url('new_user'))
127 127
128 128 @parametrize('name,attrs',
129 129 [('firstname', {'firstname': 'new_username'}),
130 130 ('lastname', {'lastname': 'new_username'}),
131 131 ('admin', {'admin': True}),
132 132 ('admin', {'admin': False}),
133 133 ('extern_type', {'extern_type': 'ldap'}),
134 134 ('extern_type', {'extern_type': None}),
135 135 ('extern_name', {'extern_name': 'test'}),
136 136 ('extern_name', {'extern_name': None}),
137 137 ('active', {'active': False}),
138 138 ('active', {'active': True}),
139 139 ('email', {'email': 'someemail@example.com'}),
140 140 # ('new_password', {'new_password': 'foobar123',
141 141 # 'password_confirmation': 'foobar123'})
142 142 ])
143 143 def test_update(self, name, attrs):
144 144 self.log_user()
145 145 usr = fixture.create_user(self.test_user_1, password='qweqwe',
146 146 email='testme@example.com',
147 147 extern_type='internal',
148 148 extern_name=self.test_user_1,
149 149 skip_if_exists=True)
150 150 Session().commit()
151 151 params = usr.get_api_data(True)
152 152 params.update({'password_confirmation': ''})
153 153 params.update({'new_password': ''})
154 154 params.update(attrs)
155 155 if name == 'email':
156 156 params['emails'] = [attrs['email']]
157 157 if name == 'extern_type':
158 158 #cannot update this via form, expected value is original one
159 159 params['extern_type'] = "internal"
160 160 if name == 'extern_name':
161 161 #cannot update this via form, expected value is original one
162 162 params['extern_name'] = self.test_user_1
163 163 # special case since this user is not
164 164 # logged in yet his data is not filled
165 165 # so we use creation data
166 166
167 167 params.update({'_authentication_token': self.authentication_token()})
168 168 response = self.app.post(url('update_user', id=usr.user_id), params)
169 169 self.checkSessionFlash(response, 'User updated successfully')
170 170 params.pop('_authentication_token')
171 171
172 172 updated_user = User.get_by_username(self.test_user_1)
173 173 updated_params = updated_user.get_api_data(True)
174 174 updated_params.update({'password_confirmation': ''})
175 175 updated_params.update({'new_password': ''})
176 176
177 177 assert params == updated_params
178 178
179 179 def test_delete(self):
180 180 self.log_user()
181 181 username = 'newtestuserdeleteme'
182 182
183 183 fixture.create_user(name=username)
184 184
185 185 new_user = Session().query(User) \
186 186 .filter(User.username == username).one()
187 187 response = self.app.post(url('delete_user', id=new_user.user_id),
188 188 params={'_authentication_token': self.authentication_token()})
189 189
190 190 self.checkSessionFlash(response, 'Successfully deleted user')
191 191
192 192 def test_delete_repo_err(self):
193 193 self.log_user()
194 194 username = 'repoerr'
195 195 reponame = u'repoerr_fail'
196 196
197 197 fixture.create_user(name=username)
198 198 fixture.create_repo(name=reponame, cur_user=username)
199 199
200 200 new_user = Session().query(User) \
201 201 .filter(User.username == username).one()
202 202 response = self.app.post(url('delete_user', id=new_user.user_id),
203 203 params={'_authentication_token': self.authentication_token()})
204 204 self.checkSessionFlash(response, 'User "%s" still '
205 205 'owns 1 repositories and cannot be removed. '
206 206 'Switch owners or remove those repositories: '
207 207 '%s' % (username, reponame))
208 208
209 209 response = self.app.post(url('delete_repo', repo_name=reponame),
210 210 params={'_authentication_token': self.authentication_token()})
211 211 self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
212 212
213 213 response = self.app.post(url('delete_user', id=new_user.user_id),
214 214 params={'_authentication_token': self.authentication_token()})
215 215 self.checkSessionFlash(response, 'Successfully deleted user')
216 216
217 217 def test_delete_repo_group_err(self, user_and_repo_group_fail):
218 218 self.log_user()
219 219 username = 'repogrouperr'
220 220 groupname = u'repogroup_fail'
221 221
222 222 new_user = Session().query(User) \
223 223 .filter(User.username == username).one()
224 224 response = self.app.post(url('delete_user', id=new_user.user_id),
225 225 params={'_authentication_token': self.authentication_token()})
226 226 self.checkSessionFlash(response, 'User "%s" still '
227 227 'owns 1 repository groups and cannot be removed. '
228 228 'Switch owners or remove those repository groups: '
229 229 '%s' % (username, groupname))
230 230
231 231 # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
232 232 # rg = RepoGroup.get_by_group_name(group_name=groupname)
233 233 # response = self.app.get(url('repos_groups', id=rg.group_id))
234 234
235 235 response = self.app.post(url('delete_repo_group', group_name=groupname),
236 236 params={'_authentication_token': self.authentication_token()})
237 237 self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
238 238
239 239 response = self.app.post(url('delete_user', id=new_user.user_id),
240 240 params={'_authentication_token': self.authentication_token()})
241 241 self.checkSessionFlash(response, 'Successfully deleted user')
242 242
243 243 def test_delete_user_group_err(self):
244 244 self.log_user()
245 245 username = 'usergrouperr'
246 246 groupname = u'usergroup_fail'
247 247
248 248 fixture.create_user(name=username)
249 249 ug = fixture.create_user_group(name=groupname, cur_user=username)
250 250
251 251 new_user = Session().query(User) \
252 252 .filter(User.username == username).one()
253 253 response = self.app.post(url('delete_user', id=new_user.user_id),
254 254 params={'_authentication_token': self.authentication_token()})
255 255 self.checkSessionFlash(response, 'User "%s" still '
256 256 'owns 1 user groups and cannot be removed. '
257 257 'Switch owners or remove those user groups: '
258 258 '%s' % (username, groupname))
259 259
260 260 # TODO: why do this fail?
261 261 #response = self.app.delete(url('delete_users_group', id=groupname))
262 262 #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
263 263
264 264 fixture.destroy_user_group(ug.users_group_id)
265 265
266 266 response = self.app.post(url('delete_user', id=new_user.user_id),
267 267 params={'_authentication_token': self.authentication_token()})
268 268 self.checkSessionFlash(response, 'Successfully deleted user')
269 269
270 270 def test_edit(self):
271 271 self.log_user()
272 272 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
273 273 response = self.app.get(url('edit_user', id=user.user_id))
274 274
275 275 def test_add_perm_create_repo(self):
276 276 self.log_user()
277 277 perm_none = Permission.get_by_key('hg.create.none')
278 278 perm_create = Permission.get_by_key('hg.create.repository')
279 279
280 280 user = UserModel().create_or_update(username='dummy', password='qwe',
281 281 email='dummy', firstname=u'a',
282 282 lastname=u'b')
283 283 Session().commit()
284 284 uid = user.user_id
285 285
286 286 try:
287 287 #User should have None permission on creation repository
288 288 assert UserModel().has_perm(user, perm_none) == False
289 289 assert UserModel().has_perm(user, perm_create) == False
290 290
291 291 response = self.app.post(url('edit_user_perms_update', id=uid),
292 292 params=dict(create_repo_perm=True,
293 293 _authentication_token=self.authentication_token()))
294 294
295 295 perm_none = Permission.get_by_key('hg.create.none')
296 296 perm_create = Permission.get_by_key('hg.create.repository')
297 297
298 298 #User should have None permission on creation repository
299 299 assert UserModel().has_perm(uid, perm_none) == False
300 300 assert UserModel().has_perm(uid, perm_create) == True
301 301 finally:
302 302 UserModel().delete(uid)
303 303 Session().commit()
304 304
305 305 def test_revoke_perm_create_repo(self):
306 306 self.log_user()
307 307 perm_none = Permission.get_by_key('hg.create.none')
308 308 perm_create = Permission.get_by_key('hg.create.repository')
309 309
310 310 user = UserModel().create_or_update(username='dummy', password='qwe',
311 311 email='dummy', firstname=u'a',
312 312 lastname=u'b')
313 313 Session().commit()
314 314 uid = user.user_id
315 315
316 316 try:
317 317 #User should have None permission on creation repository
318 318 assert UserModel().has_perm(user, perm_none) == False
319 319 assert UserModel().has_perm(user, perm_create) == False
320 320
321 321 response = self.app.post(url('edit_user_perms_update', id=uid),
322 322 params=dict(_authentication_token=self.authentication_token()))
323 323
324 324 perm_none = Permission.get_by_key('hg.create.none')
325 325 perm_create = Permission.get_by_key('hg.create.repository')
326 326
327 327 #User should have None permission on creation repository
328 328 assert UserModel().has_perm(uid, perm_none) == True
329 329 assert UserModel().has_perm(uid, perm_create) == False
330 330 finally:
331 331 UserModel().delete(uid)
332 332 Session().commit()
333 333
334 334 def test_add_perm_fork_repo(self):
335 335 self.log_user()
336 336 perm_none = Permission.get_by_key('hg.fork.none')
337 337 perm_fork = Permission.get_by_key('hg.fork.repository')
338 338
339 339 user = UserModel().create_or_update(username='dummy', password='qwe',
340 340 email='dummy', firstname=u'a',
341 341 lastname=u'b')
342 342 Session().commit()
343 343 uid = user.user_id
344 344
345 345 try:
346 346 #User should have None permission on creation repository
347 347 assert UserModel().has_perm(user, perm_none) == False
348 348 assert UserModel().has_perm(user, perm_fork) == False
349 349
350 350 response = self.app.post(url('edit_user_perms_update', id=uid),
351 351 params=dict(create_repo_perm=True,
352 352 _authentication_token=self.authentication_token()))
353 353
354 354 perm_none = Permission.get_by_key('hg.create.none')
355 355 perm_create = Permission.get_by_key('hg.create.repository')
356 356
357 357 #User should have None permission on creation repository
358 358 assert UserModel().has_perm(uid, perm_none) == False
359 359 assert UserModel().has_perm(uid, perm_create) == True
360 360 finally:
361 361 UserModel().delete(uid)
362 362 Session().commit()
363 363
364 364 def test_revoke_perm_fork_repo(self):
365 365 self.log_user()
366 366 perm_none = Permission.get_by_key('hg.fork.none')
367 367 perm_fork = Permission.get_by_key('hg.fork.repository')
368 368
369 369 user = UserModel().create_or_update(username='dummy', password='qwe',
370 370 email='dummy', firstname=u'a',
371 371 lastname=u'b')
372 372 Session().commit()
373 373 uid = user.user_id
374 374
375 375 try:
376 376 #User should have None permission on creation repository
377 377 assert UserModel().has_perm(user, perm_none) == False
378 378 assert UserModel().has_perm(user, perm_fork) == False
379 379
380 380 response = self.app.post(url('edit_user_perms_update', id=uid),
381 381 params=dict(_authentication_token=self.authentication_token()))
382 382
383 383 perm_none = Permission.get_by_key('hg.create.none')
384 384 perm_create = Permission.get_by_key('hg.create.repository')
385 385
386 386 #User should have None permission on creation repository
387 387 assert UserModel().has_perm(uid, perm_none) == True
388 388 assert UserModel().has_perm(uid, perm_create) == False
389 389 finally:
390 390 UserModel().delete(uid)
391 391 Session().commit()
392 392
393 393 def test_ips(self):
394 394 self.log_user()
395 395 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
396 396 response = self.app.get(url('edit_user_ips', id=user.user_id))
397 397 response.mustcontain('All IP addresses are allowed')
398 398
399 399 @parametrize('test_name,ip,ip_range,failure', [
400 400 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
401 401 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
402 402 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
403 403 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
404 404 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
405 405 ('127_bad_ip', 'foobar', 'foobar', True),
406 406 ])
407 407 def test_add_ip(self, test_name, ip, ip_range, failure, auto_clear_ip_permissions):
408 408 self.log_user()
409 409 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
410 410 user_id = user.user_id
411 411
412 412 response = self.app.post(url('edit_user_ips_update', id=user_id),
413 413 params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
414 414
415 415 if failure:
416 416 self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
417 417 response = self.app.get(url('edit_user_ips', id=user_id))
418 418 response.mustcontain(no=[ip])
419 419 response.mustcontain(no=[ip_range])
420 420
421 421 else:
422 422 response = self.app.get(url('edit_user_ips', id=user_id))
423 423 response.mustcontain(ip)
424 424 response.mustcontain(ip_range)
425 425
426 426 def test_delete_ip(self, auto_clear_ip_permissions):
427 427 self.log_user()
428 428 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
429 429 user_id = user.user_id
430 430 ip = '127.0.0.1/32'
431 431 ip_range = '127.0.0.1 - 127.0.0.1'
432 432 new_ip = UserModel().add_extra_ip(user_id, ip)
433 433 Session().commit()
434 434 new_ip_id = new_ip.ip_id
435 435
436 436 response = self.app.get(url('edit_user_ips', id=user_id))
437 437 response.mustcontain(ip)
438 438 response.mustcontain(ip_range)
439 439
440 440 self.app.post(url('edit_user_ips_delete', id=user_id),
441 441 params=dict(del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
442 442
443 443 response = self.app.get(url('edit_user_ips', id=user_id))
444 444 response.mustcontain('All IP addresses are allowed')
445 445 response.mustcontain(no=[ip])
446 446 response.mustcontain(no=[ip_range])
447 447
448 448 def test_api_keys(self):
449 449 self.log_user()
450 450
451 451 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
452 452 response = self.app.get(url('edit_user_api_keys', id=user.user_id))
453 453 response.mustcontain(user.api_key)
454 454 response.mustcontain('Expires: Never')
455 455
456 456 @parametrize('desc,lifetime', [
457 457 ('forever', -1),
458 458 ('5mins', 60*5),
459 459 ('30days', 60*60*24*30),
460 460 ])
461 461 def test_add_api_keys(self, desc, lifetime):
462 462 self.log_user()
463 463 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
464 464 user_id = user.user_id
465 465
466 466 response = self.app.post(url('edit_user_api_keys_update', id=user_id),
467 467 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
468 468 self.checkSessionFlash(response, 'API key successfully created')
469 469 try:
470 470 response = response.follow()
471 471 user = User.get(user_id)
472 472 for api_key in user.api_keys:
473 473 response.mustcontain(api_key)
474 474 finally:
475 475 for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
476 476 Session().delete(api_key)
477 477 Session().commit()
478 478
479 479 def test_remove_api_key(self):
480 480 self.log_user()
481 481 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
482 482 user_id = user.user_id
483 483
484 484 response = self.app.post(url('edit_user_api_keys_update', id=user_id),
485 485 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
486 486 self.checkSessionFlash(response, 'API key successfully created')
487 487 response = response.follow()
488 488
489 489 #now delete our key
490 490 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
491 491 assert 1 == len(keys)
492 492
493 493 response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
494 494 {'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
495 495 self.checkSessionFlash(response, 'API key successfully deleted')
496 496 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
497 497 assert 0 == len(keys)
498 498
499 499 def test_reset_main_api_key(self):
500 500 self.log_user()
501 501 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
502 502 user_id = user.user_id
503 503 api_key = user.api_key
504 504 response = self.app.get(url('edit_user_api_keys', id=user_id))
505 505 response.mustcontain(api_key)
506 506 response.mustcontain('Expires: Never')
507 507
508 508 response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
509 509 {'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
510 510 self.checkSessionFlash(response, 'API key successfully reset')
511 511 response = response.follow()
512 512 response.mustcontain(no=[api_key])
513 513
514 514
515 515 class TestAdminUsersController_unittest(object):
516 516 """ Unit tests for the users controller """
517 517
518 518 def test_get_user_or_raise_if_default(self, monkeypatch):
519 519 # flash complains about an non-existing session
520 520 def flash_mock(*args, **kwargs):
521 521 pass
522 522 monkeypatch.setattr(h, 'flash', flash_mock)
523 523
524 524 u = UsersController()
525 525 # a regular user should work correctly
526 526 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
527 527 assert u._get_user_or_raise_if_default(user.user_id) == user
528 528 # the default user should raise
529 529 with pytest.raises(HTTPNotFound):
530 530 u._get_user_or_raise_if_default(User.get_default_user().user_id)
531 531
532 532
533 533 class TestAdminUsersControllerForDefaultUser(TestController):
534 534 """
535 535 Edit actions on the default user are not allowed.
536 536 Validate that they throw a 404 exception.
537 537 """
538 538 def test_edit_default_user(self):
539 539 self.log_user()
540 540 user = User.get_default_user()
541 541 response = self.app.get(url('edit_user', id=user.user_id), status=404)
542 542
543 543 def test_edit_advanced_default_user(self):
544 544 self.log_user()
545 545 user = User.get_default_user()
546 546 response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
547 547
548 548 # API keys
549 549 def test_edit_api_keys_default_user(self):
550 550 self.log_user()
551 551 user = User.get_default_user()
552 552 response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
553 553
554 554 def test_add_api_keys_default_user(self):
555 555 self.log_user()
556 556 user = User.get_default_user()
557 557 response = self.app.post(url('edit_user_api_keys_update', id=user.user_id),
558 558 {'_authentication_token': self.authentication_token()}, status=404)
559 559
560 560 def test_delete_api_keys_default_user(self):
561 561 self.log_user()
562 562 user = User.get_default_user()
563 563 response = self.app.post(url('edit_user_api_keys_delete', id=user.user_id),
564 564 {'_authentication_token': self.authentication_token()}, status=404)
565 565
566 566 # Permissions
567 567 def test_edit_perms_default_user(self):
568 568 self.log_user()
569 569 user = User.get_default_user()
570 570 response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
571 571
572 572 def test_update_perms_default_user(self):
573 573 self.log_user()
574 574 user = User.get_default_user()
575 575 response = self.app.post(url('edit_user_perms_update', id=user.user_id),
576 576 {'_authentication_token': self.authentication_token()}, status=404)
577 577
578 578 # Emails
579 579 def test_edit_emails_default_user(self):
580 580 self.log_user()
581 581 user = User.get_default_user()
582 582 response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
583 583
584 584 def test_add_emails_default_user(self):
585 585 self.log_user()
586 586 user = User.get_default_user()
587 587 response = self.app.post(url('edit_user_emails_update', id=user.user_id),
588 588 {'_authentication_token': self.authentication_token()}, status=404)
589 589
590 590 def test_delete_emails_default_user(self):
591 591 self.log_user()
592 592 user = User.get_default_user()
593 593 response = self.app.post(url('edit_user_emails_delete', id=user.user_id),
594 594 {'_authentication_token': self.authentication_token()}, status=404)
595 595
596 596 # IP addresses
597 597 # Add/delete of IP addresses for the default user is used to maintain
598 598 # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
599 599 def test_edit_ip_default_user(self):
600 600 self.log_user()
601 601 user = User.get_default_user()
602 602 response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
General Comments 0
You need to be logged in to leave comments. Login now