##// END OF EJS Templates
users: add extra checks on editing the default user...
Thomas De Schampheleire -
r5168:4e076ea7 default
parent child Browse files
Show More
@@ -1,484 +1,485 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 """
14 """
15 kallithea.controllers.admin.users
15 kallithea.controllers.admin.users
16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
17
17
18 Users crud controller for pylons
18 Users crud controller for pylons
19
19
20 This file was forked by the Kallithea project in July 2014.
20 This file was forked by the Kallithea project in July 2014.
21 Original author and date, and relevant copyright and licensing information is below:
21 Original author and date, and relevant copyright and licensing information is below:
22 :created_on: Apr 4, 2010
22 :created_on: Apr 4, 2010
23 :author: marcink
23 :author: marcink
24 :copyright: (c) 2013 RhodeCode GmbH, and others.
24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 :license: GPLv3, see LICENSE.md for more details.
25 :license: GPLv3, see LICENSE.md for more details.
26 """
26 """
27
27
28 import logging
28 import logging
29 import traceback
29 import traceback
30 import formencode
30 import formencode
31
31
32 from formencode import htmlfill
32 from formencode import htmlfill
33 from pylons import request, tmpl_context as c, url, config
33 from pylons import request, tmpl_context as c, url, config
34 from pylons.controllers.util import redirect
34 from pylons.controllers.util import redirect
35 from pylons.i18n.translation import _
35 from pylons.i18n.translation import _
36 from sqlalchemy.sql.expression import func
36 from sqlalchemy.sql.expression import func
37 from webob.exc import HTTPNotFound
37 from webob.exc import HTTPNotFound
38
38
39 import kallithea
39 import kallithea
40 from kallithea.lib.exceptions import DefaultUserException, \
40 from kallithea.lib.exceptions import DefaultUserException, \
41 UserOwnsReposException, UserCreationError
41 UserOwnsReposException, UserCreationError
42 from kallithea.lib import helpers as h
42 from kallithea.lib import helpers as h
43 from kallithea.lib.auth import LoginRequired, HasPermissionAllDecorator, \
43 from kallithea.lib.auth import LoginRequired, HasPermissionAllDecorator, \
44 AuthUser
44 AuthUser
45 import kallithea.lib.auth_modules.auth_internal
45 import kallithea.lib.auth_modules.auth_internal
46 from kallithea.lib import auth_modules
46 from kallithea.lib import auth_modules
47 from kallithea.lib.base import BaseController, render
47 from kallithea.lib.base import BaseController, render
48 from kallithea.model.api_key import ApiKeyModel
48 from kallithea.model.api_key import ApiKeyModel
49
49
50 from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
50 from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
51 from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
51 from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
52 from kallithea.model.user import UserModel
52 from kallithea.model.user import UserModel
53 from kallithea.model.meta import Session
53 from kallithea.model.meta import Session
54 from kallithea.lib.utils import action_logger
54 from kallithea.lib.utils import action_logger
55 from kallithea.lib.compat import json
55 from kallithea.lib.compat import json
56 from kallithea.lib.utils2 import datetime_to_time, safe_int, generate_api_key
56 from kallithea.lib.utils2 import datetime_to_time, safe_int, generate_api_key
57
57
58 log = logging.getLogger(__name__)
58 log = logging.getLogger(__name__)
59
59
60
60
61 class UsersController(BaseController):
61 class UsersController(BaseController):
62 """REST Controller styled on the Atom Publishing Protocol"""
62 """REST Controller styled on the Atom Publishing Protocol"""
63
63
64 @LoginRequired()
64 @LoginRequired()
65 @HasPermissionAllDecorator('hg.admin')
65 @HasPermissionAllDecorator('hg.admin')
66 def __before__(self):
66 def __before__(self):
67 super(UsersController, self).__before__()
67 super(UsersController, self).__before__()
68 c.available_permissions = config['available_permissions']
68 c.available_permissions = config['available_permissions']
69 c.EXTERN_TYPE_INTERNAL = kallithea.EXTERN_TYPE_INTERNAL
69 c.EXTERN_TYPE_INTERNAL = kallithea.EXTERN_TYPE_INTERNAL
70
70
71 def index(self, format='html'):
71 def index(self, format='html'):
72 """GET /users: All items in the collection"""
72 """GET /users: All items in the collection"""
73 # url('users')
73 # url('users')
74
74
75 c.users_list = User.query().order_by(User.username)\
75 c.users_list = User.query().order_by(User.username)\
76 .filter(User.username != User.DEFAULT_USER)\
76 .filter(User.username != User.DEFAULT_USER)\
77 .order_by(func.lower(User.username))\
77 .order_by(func.lower(User.username))\
78 .all()
78 .all()
79
79
80 users_data = []
80 users_data = []
81 total_records = len(c.users_list)
81 total_records = len(c.users_list)
82 _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
82 _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
83 template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
83 template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
84
84
85 grav_tmpl = '<div class="gravatar">%s</div>'
85 grav_tmpl = '<div class="gravatar">%s</div>'
86
86
87 username = lambda user_id, username: (
87 username = lambda user_id, username: (
88 template.get_def("user_name")
88 template.get_def("user_name")
89 .render(user_id, username, _=_, h=h, c=c))
89 .render(user_id, username, _=_, h=h, c=c))
90
90
91 user_actions = lambda user_id, username: (
91 user_actions = lambda user_id, username: (
92 template.get_def("user_actions")
92 template.get_def("user_actions")
93 .render(user_id, username, _=_, h=h, c=c))
93 .render(user_id, username, _=_, h=h, c=c))
94
94
95 for user in c.users_list:
95 for user in c.users_list:
96 users_data.append({
96 users_data.append({
97 "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
97 "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
98 "raw_name": user.username,
98 "raw_name": user.username,
99 "username": username(user.user_id, user.username),
99 "username": username(user.user_id, user.username),
100 "firstname": h.escape(user.name),
100 "firstname": h.escape(user.name),
101 "lastname": h.escape(user.lastname),
101 "lastname": h.escape(user.lastname),
102 "last_login": h.fmt_date(user.last_login),
102 "last_login": h.fmt_date(user.last_login),
103 "last_login_raw": datetime_to_time(user.last_login),
103 "last_login_raw": datetime_to_time(user.last_login),
104 "active": h.boolicon(user.active),
104 "active": h.boolicon(user.active),
105 "admin": h.boolicon(user.admin),
105 "admin": h.boolicon(user.admin),
106 "extern_type": user.extern_type,
106 "extern_type": user.extern_type,
107 "extern_name": user.extern_name,
107 "extern_name": user.extern_name,
108 "action": user_actions(user.user_id, user.username),
108 "action": user_actions(user.user_id, user.username),
109 })
109 })
110
110
111 c.data = json.dumps({
111 c.data = json.dumps({
112 "totalRecords": total_records,
112 "totalRecords": total_records,
113 "startIndex": 0,
113 "startIndex": 0,
114 "sort": None,
114 "sort": None,
115 "dir": "asc",
115 "dir": "asc",
116 "records": users_data
116 "records": users_data
117 })
117 })
118
118
119 return render('admin/users/users.html')
119 return render('admin/users/users.html')
120
120
121 def create(self):
121 def create(self):
122 """POST /users: Create a new item"""
122 """POST /users: Create a new item"""
123 # url('users')
123 # url('users')
124 c.default_extern_type = auth_modules.auth_internal.KallitheaAuthPlugin.name
124 c.default_extern_type = auth_modules.auth_internal.KallitheaAuthPlugin.name
125 user_model = UserModel()
125 user_model = UserModel()
126 user_form = UserForm()()
126 user_form = UserForm()()
127 try:
127 try:
128 form_result = user_form.to_python(dict(request.POST))
128 form_result = user_form.to_python(dict(request.POST))
129 user = user_model.create(form_result)
129 user = user_model.create(form_result)
130 usr = form_result['username']
130 usr = form_result['username']
131 action_logger(self.authuser, 'admin_created_user:%s' % usr,
131 action_logger(self.authuser, 'admin_created_user:%s' % usr,
132 None, self.ip_addr, self.sa)
132 None, self.ip_addr, self.sa)
133 h.flash(h.literal(_('Created user %s') % h.link_to(h.escape(usr), url('edit_user', id=user.user_id))),
133 h.flash(h.literal(_('Created user %s') % h.link_to(h.escape(usr), url('edit_user', id=user.user_id))),
134 category='success')
134 category='success')
135 Session().commit()
135 Session().commit()
136 except formencode.Invalid, errors:
136 except formencode.Invalid, errors:
137 return htmlfill.render(
137 return htmlfill.render(
138 render('admin/users/user_add.html'),
138 render('admin/users/user_add.html'),
139 defaults=errors.value,
139 defaults=errors.value,
140 errors=errors.error_dict or {},
140 errors=errors.error_dict or {},
141 prefix_error=False,
141 prefix_error=False,
142 encoding="UTF-8",
142 encoding="UTF-8",
143 force_defaults=False)
143 force_defaults=False)
144 except UserCreationError, e:
144 except UserCreationError, e:
145 h.flash(e, 'error')
145 h.flash(e, 'error')
146 except Exception:
146 except Exception:
147 log.error(traceback.format_exc())
147 log.error(traceback.format_exc())
148 h.flash(_('Error occurred during creation of user %s') \
148 h.flash(_('Error occurred during creation of user %s') \
149 % request.POST.get('username'), category='error')
149 % request.POST.get('username'), category='error')
150 return redirect(url('users'))
150 return redirect(url('users'))
151
151
152 def new(self, format='html'):
152 def new(self, format='html'):
153 """GET /users/new: Form to create a new item"""
153 """GET /users/new: Form to create a new item"""
154 # url('new_user')
154 # url('new_user')
155 c.default_extern_type = auth_modules.auth_internal.KallitheaAuthPlugin.name
155 c.default_extern_type = auth_modules.auth_internal.KallitheaAuthPlugin.name
156 return render('admin/users/user_add.html')
156 return render('admin/users/user_add.html')
157
157
158 def update(self, id):
158 def update(self, id):
159 """PUT /users/id: Update an existing item"""
159 """PUT /users/id: Update an existing item"""
160 # Forms posted to this method should contain a hidden field:
160 # Forms posted to this method should contain a hidden field:
161 # <input type="hidden" name="_method" value="PUT" />
161 # <input type="hidden" name="_method" value="PUT" />
162 # Or using helpers:
162 # Or using helpers:
163 # h.form(url('update_user', id=ID),
163 # h.form(url('update_user', id=ID),
164 # method='put')
164 # method='put')
165 # url('user', id=ID)
165 # url('user', id=ID)
166 c.active = 'profile'
166 c.active = 'profile'
167 user_model = UserModel()
167 user_model = UserModel()
168 c.user = user_model.get(id)
168 c.user = user_model.get(id)
169 c.extern_type = c.user.extern_type
169 c.extern_type = c.user.extern_type
170 c.extern_name = c.user.extern_name
170 c.extern_name = c.user.extern_name
171 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
171 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
172 _form = UserForm(edit=True, old_data={'user_id': id,
172 _form = UserForm(edit=True, old_data={'user_id': id,
173 'email': c.user.email})()
173 'email': c.user.email})()
174 form_result = {}
174 form_result = {}
175 try:
175 try:
176 form_result = _form.to_python(dict(request.POST))
176 form_result = _form.to_python(dict(request.POST))
177 skip_attrs = ['extern_type', 'extern_name']
177 skip_attrs = ['extern_type', 'extern_name']
178 #TODO: plugin should define if username can be updated
178 #TODO: plugin should define if username can be updated
179 if c.extern_type != kallithea.EXTERN_TYPE_INTERNAL:
179 if c.extern_type != kallithea.EXTERN_TYPE_INTERNAL:
180 # forbid updating username for external accounts
180 # forbid updating username for external accounts
181 skip_attrs.append('username')
181 skip_attrs.append('username')
182
182
183 user_model.update(id, form_result, skip_attrs=skip_attrs)
183 user_model.update(id, form_result, skip_attrs=skip_attrs)
184 usr = form_result['username']
184 usr = form_result['username']
185 action_logger(self.authuser, 'admin_updated_user:%s' % usr,
185 action_logger(self.authuser, 'admin_updated_user:%s' % usr,
186 None, self.ip_addr, self.sa)
186 None, self.ip_addr, self.sa)
187 h.flash(_('User updated successfully'), category='success')
187 h.flash(_('User updated successfully'), category='success')
188 Session().commit()
188 Session().commit()
189 except formencode.Invalid, errors:
189 except formencode.Invalid, errors:
190 defaults = errors.value
190 defaults = errors.value
191 e = errors.error_dict or {}
191 e = errors.error_dict or {}
192 defaults.update({
192 defaults.update({
193 'create_repo_perm': user_model.has_perm(id,
193 'create_repo_perm': user_model.has_perm(id,
194 'hg.create.repository'),
194 'hg.create.repository'),
195 'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'),
195 'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'),
196 '_method': 'put'
196 '_method': 'put'
197 })
197 })
198 return htmlfill.render(
198 return htmlfill.render(
199 render('admin/users/user_edit.html'),
199 render('admin/users/user_edit.html'),
200 defaults=defaults,
200 defaults=defaults,
201 errors=e,
201 errors=e,
202 prefix_error=False,
202 prefix_error=False,
203 encoding="UTF-8",
203 encoding="UTF-8",
204 force_defaults=False)
204 force_defaults=False)
205 except Exception:
205 except Exception:
206 log.error(traceback.format_exc())
206 log.error(traceback.format_exc())
207 h.flash(_('Error occurred during update of user %s') \
207 h.flash(_('Error occurred during update of user %s') \
208 % form_result.get('username'), category='error')
208 % form_result.get('username'), category='error')
209 return redirect(url('edit_user', id=id))
209 return redirect(url('edit_user', id=id))
210
210
211 def delete(self, id):
211 def delete(self, id):
212 """DELETE /users/id: Delete an existing item"""
212 """DELETE /users/id: Delete an existing item"""
213 # Forms posted to this method should contain a hidden field:
213 # Forms posted to this method should contain a hidden field:
214 # <input type="hidden" name="_method" value="DELETE" />
214 # <input type="hidden" name="_method" value="DELETE" />
215 # Or using helpers:
215 # Or using helpers:
216 # h.form(url('delete_user', id=ID),
216 # h.form(url('delete_user', id=ID),
217 # method='delete')
217 # method='delete')
218 # url('user', id=ID)
218 # url('user', id=ID)
219 usr = User.get_or_404(id)
219 usr = User.get_or_404(id)
220 try:
220 try:
221 UserModel().delete(usr)
221 UserModel().delete(usr)
222 Session().commit()
222 Session().commit()
223 h.flash(_('Successfully deleted user'), category='success')
223 h.flash(_('Successfully deleted user'), category='success')
224 except (UserOwnsReposException, DefaultUserException), e:
224 except (UserOwnsReposException, DefaultUserException), e:
225 h.flash(e, category='warning')
225 h.flash(e, category='warning')
226 except Exception:
226 except Exception:
227 log.error(traceback.format_exc())
227 log.error(traceback.format_exc())
228 h.flash(_('An error occurred during deletion of user'),
228 h.flash(_('An error occurred during deletion of user'),
229 category='error')
229 category='error')
230 return redirect(url('users'))
230 return redirect(url('users'))
231
231
232 def show(self, id, format='html'):
232 def show(self, id, format='html'):
233 """GET /users/id: Show a specific item"""
233 """GET /users/id: Show a specific item"""
234 # url('user', id=ID)
234 # url('user', id=ID)
235 User.get_or_404(-1)
235 User.get_or_404(-1)
236
236
237 def _get_user_or_raise_if_default(self, id):
237 def _get_user_or_raise_if_default(self, id):
238 try:
238 try:
239 return User.get_or_404(id, allow_default=False)
239 return User.get_or_404(id, allow_default=False)
240 except DefaultUserException:
240 except DefaultUserException:
241 h.flash(_("The default user cannot be edited"), category='warning')
241 h.flash(_("The default user cannot be edited"), category='warning')
242 raise HTTPNotFound
242 raise HTTPNotFound
243
243
244 def edit(self, id, format='html'):
244 def edit(self, id, format='html'):
245 """GET /users/id/edit: Form to edit an existing item"""
245 """GET /users/id/edit: Form to edit an existing item"""
246 # url('edit_user', id=ID)
246 # url('edit_user', id=ID)
247 c.user = self._get_user_or_raise_if_default(id)
247 c.user = self._get_user_or_raise_if_default(id)
248 c.active = 'profile'
248 c.active = 'profile'
249 c.extern_type = c.user.extern_type
249 c.extern_type = c.user.extern_type
250 c.extern_name = c.user.extern_name
250 c.extern_name = c.user.extern_name
251 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
251 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
252
252
253 defaults = c.user.get_dict()
253 defaults = c.user.get_dict()
254 return htmlfill.render(
254 return htmlfill.render(
255 render('admin/users/user_edit.html'),
255 render('admin/users/user_edit.html'),
256 defaults=defaults,
256 defaults=defaults,
257 encoding="UTF-8",
257 encoding="UTF-8",
258 force_defaults=False)
258 force_defaults=False)
259
259
260 def edit_advanced(self, id):
260 def edit_advanced(self, id):
261 c.user = self._get_user_or_raise_if_default(id)
261 c.user = self._get_user_or_raise_if_default(id)
262 c.active = 'advanced'
262 c.active = 'advanced'
263 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
263 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
264
264
265 umodel = UserModel()
265 umodel = UserModel()
266 defaults = c.user.get_dict()
266 defaults = c.user.get_dict()
267 defaults.update({
267 defaults.update({
268 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
268 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
269 'create_user_group_perm': umodel.has_perm(c.user,
269 'create_user_group_perm': umodel.has_perm(c.user,
270 'hg.usergroup.create.true'),
270 'hg.usergroup.create.true'),
271 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
271 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
272 })
272 })
273 return htmlfill.render(
273 return htmlfill.render(
274 render('admin/users/user_edit.html'),
274 render('admin/users/user_edit.html'),
275 defaults=defaults,
275 defaults=defaults,
276 encoding="UTF-8",
276 encoding="UTF-8",
277 force_defaults=False)
277 force_defaults=False)
278
278
279 def edit_api_keys(self, id):
279 def edit_api_keys(self, id):
280 c.user = self._get_user_or_raise_if_default(id)
280 c.user = self._get_user_or_raise_if_default(id)
281 c.active = 'api_keys'
281 c.active = 'api_keys'
282 show_expired = True
282 show_expired = True
283 c.lifetime_values = [
283 c.lifetime_values = [
284 (str(-1), _('Forever')),
284 (str(-1), _('Forever')),
285 (str(5), _('5 minutes')),
285 (str(5), _('5 minutes')),
286 (str(60), _('1 hour')),
286 (str(60), _('1 hour')),
287 (str(60 * 24), _('1 day')),
287 (str(60 * 24), _('1 day')),
288 (str(60 * 24 * 30), _('1 month')),
288 (str(60 * 24 * 30), _('1 month')),
289 ]
289 ]
290 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
290 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
291 c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
291 c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
292 show_expired=show_expired)
292 show_expired=show_expired)
293 defaults = c.user.get_dict()
293 defaults = c.user.get_dict()
294 return htmlfill.render(
294 return htmlfill.render(
295 render('admin/users/user_edit.html'),
295 render('admin/users/user_edit.html'),
296 defaults=defaults,
296 defaults=defaults,
297 encoding="UTF-8",
297 encoding="UTF-8",
298 force_defaults=False)
298 force_defaults=False)
299
299
300 def add_api_key(self, id):
300 def add_api_key(self, id):
301 c.user = self._get_user_or_raise_if_default(id)
301 c.user = self._get_user_or_raise_if_default(id)
302
302
303 lifetime = safe_int(request.POST.get('lifetime'), -1)
303 lifetime = safe_int(request.POST.get('lifetime'), -1)
304 description = request.POST.get('description')
304 description = request.POST.get('description')
305 ApiKeyModel().create(c.user.user_id, description, lifetime)
305 ApiKeyModel().create(c.user.user_id, description, lifetime)
306 Session().commit()
306 Session().commit()
307 h.flash(_("API key successfully created"), category='success')
307 h.flash(_("API key successfully created"), category='success')
308 return redirect(url('edit_user_api_keys', id=c.user.user_id))
308 return redirect(url('edit_user_api_keys', id=c.user.user_id))
309
309
310 def delete_api_key(self, id):
310 def delete_api_key(self, id):
311 c.user = self._get_user_or_raise_if_default(id)
311 c.user = self._get_user_or_raise_if_default(id)
312
312
313 api_key = request.POST.get('del_api_key')
313 api_key = request.POST.get('del_api_key')
314 if request.POST.get('del_api_key_builtin'):
314 if request.POST.get('del_api_key_builtin'):
315 user = User.get(c.user.user_id)
315 user = User.get(c.user.user_id)
316 if user:
316 if user:
317 user.api_key = generate_api_key(user.username)
317 user.api_key = generate_api_key(user.username)
318 Session().add(user)
318 Session().add(user)
319 Session().commit()
319 Session().commit()
320 h.flash(_("API key successfully reset"), category='success')
320 h.flash(_("API key successfully reset"), category='success')
321 elif api_key:
321 elif api_key:
322 ApiKeyModel().delete(api_key, c.user.user_id)
322 ApiKeyModel().delete(api_key, c.user.user_id)
323 Session().commit()
323 Session().commit()
324 h.flash(_("API key successfully deleted"), category='success')
324 h.flash(_("API key successfully deleted"), category='success')
325
325
326 return redirect(url('edit_user_api_keys', id=c.user.user_id))
326 return redirect(url('edit_user_api_keys', id=c.user.user_id))
327
327
328 def update_account(self, id):
328 def update_account(self, id):
329 pass
329 pass
330
330
331 def edit_perms(self, id):
331 def edit_perms(self, id):
332 c.user = self._get_user_or_raise_if_default(id)
332 c.user = self._get_user_or_raise_if_default(id)
333 c.active = 'perms'
333 c.active = 'perms'
334 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
334 c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
335
335
336 umodel = UserModel()
336 umodel = UserModel()
337 defaults = c.user.get_dict()
337 defaults = c.user.get_dict()
338 defaults.update({
338 defaults.update({
339 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
339 'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
340 'create_user_group_perm': umodel.has_perm(c.user,
340 'create_user_group_perm': umodel.has_perm(c.user,
341 'hg.usergroup.create.true'),
341 'hg.usergroup.create.true'),
342 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
342 'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
343 })
343 })
344 return htmlfill.render(
344 return htmlfill.render(
345 render('admin/users/user_edit.html'),
345 render('admin/users/user_edit.html'),
346 defaults=defaults,
346 defaults=defaults,
347 encoding="UTF-8",
347 encoding="UTF-8",
348 force_defaults=False)
348 force_defaults=False)
349
349
350 def update_perms(self, id):
350 def update_perms(self, id):
351 """PUT /users_perm/id: Update an existing item"""
351 """PUT /users_perm/id: Update an existing item"""
352 # url('user_perm', id=ID, method='put')
352 # url('user_perm', id=ID, method='put')
353 user = User.get_or_404(id)
353 user = self._get_user_or_raise_if_default(id)
354
354
355 try:
355 try:
356 form = CustomDefaultPermissionsForm()()
356 form = CustomDefaultPermissionsForm()()
357 form_result = form.to_python(request.POST)
357 form_result = form.to_python(request.POST)
358
358
359 inherit_perms = form_result['inherit_default_permissions']
359 inherit_perms = form_result['inherit_default_permissions']
360 user.inherit_default_permissions = inherit_perms
360 user.inherit_default_permissions = inherit_perms
361 Session().add(user)
361 Session().add(user)
362 user_model = UserModel()
362 user_model = UserModel()
363
363
364 defs = UserToPerm.query()\
364 defs = UserToPerm.query()\
365 .filter(UserToPerm.user == user)\
365 .filter(UserToPerm.user == user)\
366 .all()
366 .all()
367 for ug in defs:
367 for ug in defs:
368 Session().delete(ug)
368 Session().delete(ug)
369
369
370 if form_result['create_repo_perm']:
370 if form_result['create_repo_perm']:
371 user_model.grant_perm(id, 'hg.create.repository')
371 user_model.grant_perm(id, 'hg.create.repository')
372 else:
372 else:
373 user_model.grant_perm(id, 'hg.create.none')
373 user_model.grant_perm(id, 'hg.create.none')
374 if form_result['create_user_group_perm']:
374 if form_result['create_user_group_perm']:
375 user_model.grant_perm(id, 'hg.usergroup.create.true')
375 user_model.grant_perm(id, 'hg.usergroup.create.true')
376 else:
376 else:
377 user_model.grant_perm(id, 'hg.usergroup.create.false')
377 user_model.grant_perm(id, 'hg.usergroup.create.false')
378 if form_result['fork_repo_perm']:
378 if form_result['fork_repo_perm']:
379 user_model.grant_perm(id, 'hg.fork.repository')
379 user_model.grant_perm(id, 'hg.fork.repository')
380 else:
380 else:
381 user_model.grant_perm(id, 'hg.fork.none')
381 user_model.grant_perm(id, 'hg.fork.none')
382 h.flash(_("Updated permissions"), category='success')
382 h.flash(_("Updated permissions"), category='success')
383 Session().commit()
383 Session().commit()
384 except Exception:
384 except Exception:
385 log.error(traceback.format_exc())
385 log.error(traceback.format_exc())
386 h.flash(_('An error occurred during permissions saving'),
386 h.flash(_('An error occurred during permissions saving'),
387 category='error')
387 category='error')
388 return redirect(url('edit_user_perms', id=id))
388 return redirect(url('edit_user_perms', id=id))
389
389
390 def edit_emails(self, id):
390 def edit_emails(self, id):
391 c.user = self._get_user_or_raise_if_default(id)
391 c.user = self._get_user_or_raise_if_default(id)
392 c.active = 'emails'
392 c.active = 'emails'
393 c.user_email_map = UserEmailMap.query()\
393 c.user_email_map = UserEmailMap.query()\
394 .filter(UserEmailMap.user == c.user).all()
394 .filter(UserEmailMap.user == c.user).all()
395
395
396 defaults = c.user.get_dict()
396 defaults = c.user.get_dict()
397 return htmlfill.render(
397 return htmlfill.render(
398 render('admin/users/user_edit.html'),
398 render('admin/users/user_edit.html'),
399 defaults=defaults,
399 defaults=defaults,
400 encoding="UTF-8",
400 encoding="UTF-8",
401 force_defaults=False)
401 force_defaults=False)
402
402
403 def add_email(self, id):
403 def add_email(self, id):
404 """POST /user_emails:Add an existing item"""
404 """POST /user_emails:Add an existing item"""
405 # url('user_emails', id=ID, method='put')
405 # url('user_emails', id=ID, method='put')
406
406 user = self._get_user_or_raise_if_default(id)
407 email = request.POST.get('new_email')
407 email = request.POST.get('new_email')
408 user_model = UserModel()
408 user_model = UserModel()
409
409
410 try:
410 try:
411 user_model.add_extra_email(id, email)
411 user_model.add_extra_email(id, email)
412 Session().commit()
412 Session().commit()
413 h.flash(_("Added email %s to user") % email, category='success')
413 h.flash(_("Added email %s to user") % email, category='success')
414 except formencode.Invalid, error:
414 except formencode.Invalid, error:
415 msg = error.error_dict['email']
415 msg = error.error_dict['email']
416 h.flash(msg, category='error')
416 h.flash(msg, category='error')
417 except Exception:
417 except Exception:
418 log.error(traceback.format_exc())
418 log.error(traceback.format_exc())
419 h.flash(_('An error occurred during email saving'),
419 h.flash(_('An error occurred during email saving'),
420 category='error')
420 category='error')
421 return redirect(url('edit_user_emails', id=id))
421 return redirect(url('edit_user_emails', id=id))
422
422
423 def delete_email(self, id):
423 def delete_email(self, id):
424 """DELETE /user_emails_delete/id: Delete an existing item"""
424 """DELETE /user_emails_delete/id: Delete an existing item"""
425 # url('user_emails_delete', id=ID, method='delete')
425 # url('user_emails_delete', id=ID, method='delete')
426 user = self._get_user_or_raise_if_default(id)
426 email_id = request.POST.get('del_email_id')
427 email_id = request.POST.get('del_email_id')
427 user_model = UserModel()
428 user_model = UserModel()
428 user_model.delete_extra_email(id, email_id)
429 user_model.delete_extra_email(id, email_id)
429 Session().commit()
430 Session().commit()
430 h.flash(_("Removed email from user"), category='success')
431 h.flash(_("Removed email from user"), category='success')
431 return redirect(url('edit_user_emails', id=id))
432 return redirect(url('edit_user_emails', id=id))
432
433
433 def edit_ips(self, id):
434 def edit_ips(self, id):
434 c.user = self._get_user_or_raise_if_default(id)
435 c.user = self._get_user_or_raise_if_default(id)
435 c.active = 'ips'
436 c.active = 'ips'
436 c.user_ip_map = UserIpMap.query()\
437 c.user_ip_map = UserIpMap.query()\
437 .filter(UserIpMap.user == c.user).all()
438 .filter(UserIpMap.user == c.user).all()
438
439
439 c.inherit_default_ips = c.user.inherit_default_permissions
440 c.inherit_default_ips = c.user.inherit_default_permissions
440 c.default_user_ip_map = UserIpMap.query()\
441 c.default_user_ip_map = UserIpMap.query()\
441 .filter(UserIpMap.user == User.get_default_user()).all()
442 .filter(UserIpMap.user == User.get_default_user()).all()
442
443
443 defaults = c.user.get_dict()
444 defaults = c.user.get_dict()
444 return htmlfill.render(
445 return htmlfill.render(
445 render('admin/users/user_edit.html'),
446 render('admin/users/user_edit.html'),
446 defaults=defaults,
447 defaults=defaults,
447 encoding="UTF-8",
448 encoding="UTF-8",
448 force_defaults=False)
449 force_defaults=False)
449
450
450 def add_ip(self, id):
451 def add_ip(self, id):
451 """POST /user_ips:Add an existing item"""
452 """POST /user_ips:Add an existing item"""
452 # url('user_ips', id=ID, method='put')
453 # url('user_ips', id=ID, method='put')
453
454
454 ip = request.POST.get('new_ip')
455 ip = request.POST.get('new_ip')
455 user_model = UserModel()
456 user_model = UserModel()
456
457
457 try:
458 try:
458 user_model.add_extra_ip(id, ip)
459 user_model.add_extra_ip(id, ip)
459 Session().commit()
460 Session().commit()
460 h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
461 h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
461 except formencode.Invalid, error:
462 except formencode.Invalid, error:
462 msg = error.error_dict['ip']
463 msg = error.error_dict['ip']
463 h.flash(msg, category='error')
464 h.flash(msg, category='error')
464 except Exception:
465 except Exception:
465 log.error(traceback.format_exc())
466 log.error(traceback.format_exc())
466 h.flash(_('An error occurred during ip saving'),
467 h.flash(_('An error occurred during ip saving'),
467 category='error')
468 category='error')
468
469
469 if 'default_user' in request.POST:
470 if 'default_user' in request.POST:
470 return redirect(url('admin_permissions_ips'))
471 return redirect(url('admin_permissions_ips'))
471 return redirect(url('edit_user_ips', id=id))
472 return redirect(url('edit_user_ips', id=id))
472
473
473 def delete_ip(self, id):
474 def delete_ip(self, id):
474 """DELETE /user_ips_delete/id: Delete an existing item"""
475 """DELETE /user_ips_delete/id: Delete an existing item"""
475 # url('user_ips_delete', id=ID, method='delete')
476 # url('user_ips_delete', id=ID, method='delete')
476 ip_id = request.POST.get('del_ip_id')
477 ip_id = request.POST.get('del_ip_id')
477 user_model = UserModel()
478 user_model = UserModel()
478 user_model.delete_extra_ip(id, ip_id)
479 user_model.delete_extra_ip(id, ip_id)
479 Session().commit()
480 Session().commit()
480 h.flash(_("Removed IP address from user whitelist"), category='success')
481 h.flash(_("Removed IP address from user whitelist"), category='success')
481
482
482 if 'default_user' in request.POST:
483 if 'default_user' in request.POST:
483 return redirect(url('admin_permissions_ips'))
484 return redirect(url('admin_permissions_ips'))
484 return redirect(url('edit_user_ips', id=id))
485 return redirect(url('edit_user_ips', id=id))
@@ -1,578 +1,596 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14
14
15 from sqlalchemy.orm.exc import NoResultFound
15 from sqlalchemy.orm.exc import NoResultFound
16 from webob.exc import HTTPNotFound
16 from webob.exc import HTTPNotFound
17
17
18 from kallithea.tests import *
18 from kallithea.tests import *
19 from kallithea.tests.fixture import Fixture
19 from kallithea.tests.fixture import Fixture
20 from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
20 from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
21 from kallithea.lib.auth import check_password
21 from kallithea.lib.auth import check_password
22 from kallithea.model.user import UserModel
22 from kallithea.model.user import UserModel
23 from kallithea.model import validators
23 from kallithea.model import validators
24 from kallithea.lib import helpers as h
24 from kallithea.lib import helpers as h
25 from kallithea.model.meta import Session
25 from kallithea.model.meta import Session
26
26
27 fixture = Fixture()
27 fixture = Fixture()
28
28
29
29
30 class TestAdminUsersController(TestController):
30 class TestAdminUsersController(TestController):
31 test_user_1 = 'testme'
31 test_user_1 = 'testme'
32
32
33 @classmethod
33 @classmethod
34 def teardown_class(cls):
34 def teardown_class(cls):
35 if User.get_by_username(cls.test_user_1):
35 if User.get_by_username(cls.test_user_1):
36 UserModel().delete(cls.test_user_1)
36 UserModel().delete(cls.test_user_1)
37 Session().commit()
37 Session().commit()
38
38
39 def test_index(self):
39 def test_index(self):
40 self.log_user()
40 self.log_user()
41 response = self.app.get(url('users'))
41 response = self.app.get(url('users'))
42 # Test response...
42 # Test response...
43
43
44 def test_create(self):
44 def test_create(self):
45 self.log_user()
45 self.log_user()
46 username = 'newtestuser'
46 username = 'newtestuser'
47 password = 'test12'
47 password = 'test12'
48 password_confirmation = password
48 password_confirmation = password
49 name = 'name'
49 name = 'name'
50 lastname = 'lastname'
50 lastname = 'lastname'
51 email = 'mail@mail.com'
51 email = 'mail@mail.com'
52
52
53 response = self.app.post(url('users'),
53 response = self.app.post(url('users'),
54 {'username': username,
54 {'username': username,
55 'password': password,
55 'password': password,
56 'password_confirmation': password_confirmation,
56 'password_confirmation': password_confirmation,
57 'firstname': name,
57 'firstname': name,
58 'active': True,
58 'active': True,
59 'lastname': lastname,
59 'lastname': lastname,
60 'extern_name': 'internal',
60 'extern_name': 'internal',
61 'extern_type': 'internal',
61 'extern_type': 'internal',
62 'email': email,
62 'email': email,
63 '_authentication_token': self.authentication_token()})
63 '_authentication_token': self.authentication_token()})
64
64
65 self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''')
65 self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''')
66 self.checkSessionFlash(response, '''/edit">%s</a>''' % (username))
66 self.checkSessionFlash(response, '''/edit">%s</a>''' % (username))
67
67
68 new_user = Session().query(User).\
68 new_user = Session().query(User).\
69 filter(User.username == username).one()
69 filter(User.username == username).one()
70
70
71 self.assertEqual(new_user.username, username)
71 self.assertEqual(new_user.username, username)
72 self.assertEqual(check_password(password, new_user.password), True)
72 self.assertEqual(check_password(password, new_user.password), True)
73 self.assertEqual(new_user.name, name)
73 self.assertEqual(new_user.name, name)
74 self.assertEqual(new_user.lastname, lastname)
74 self.assertEqual(new_user.lastname, lastname)
75 self.assertEqual(new_user.email, email)
75 self.assertEqual(new_user.email, email)
76
76
77 response.follow()
77 response.follow()
78 response = response.follow()
78 response = response.follow()
79 response.mustcontain("""newtestuser""")
79 response.mustcontain("""newtestuser""")
80
80
81 def test_create_err(self):
81 def test_create_err(self):
82 self.log_user()
82 self.log_user()
83 username = 'new_user'
83 username = 'new_user'
84 password = ''
84 password = ''
85 name = 'name'
85 name = 'name'
86 lastname = 'lastname'
86 lastname = 'lastname'
87 email = 'errmail.com'
87 email = 'errmail.com'
88
88
89 response = self.app.post(url('users'), {'username': username,
89 response = self.app.post(url('users'), {'username': username,
90 'password': password,
90 'password': password,
91 'name': name,
91 'name': name,
92 'active': False,
92 'active': False,
93 'lastname': lastname,
93 'lastname': lastname,
94 'email': email,
94 'email': email,
95 '_authentication_token': self.authentication_token()})
95 '_authentication_token': self.authentication_token()})
96
96
97 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
97 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
98 msg = h.html_escape(msg % {'username': 'new_user'})
98 msg = h.html_escape(msg % {'username': 'new_user'})
99 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
99 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
100 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
100 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
101 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
101 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
102
102
103 def get_user():
103 def get_user():
104 Session().query(User).filter(User.username == username).one()
104 Session().query(User).filter(User.username == username).one()
105
105
106 self.assertRaises(NoResultFound, get_user), 'found user in database'
106 self.assertRaises(NoResultFound, get_user), 'found user in database'
107
107
108 def test_new(self):
108 def test_new(self):
109 self.log_user()
109 self.log_user()
110 response = self.app.get(url('new_user'))
110 response = self.app.get(url('new_user'))
111
111
112 @parameterized.expand(
112 @parameterized.expand(
113 [('firstname', {'firstname': 'new_username'}),
113 [('firstname', {'firstname': 'new_username'}),
114 ('lastname', {'lastname': 'new_username'}),
114 ('lastname', {'lastname': 'new_username'}),
115 ('admin', {'admin': True}),
115 ('admin', {'admin': True}),
116 ('admin', {'admin': False}),
116 ('admin', {'admin': False}),
117 ('extern_type', {'extern_type': 'ldap'}),
117 ('extern_type', {'extern_type': 'ldap'}),
118 ('extern_type', {'extern_type': None}),
118 ('extern_type', {'extern_type': None}),
119 ('extern_name', {'extern_name': 'test'}),
119 ('extern_name', {'extern_name': 'test'}),
120 ('extern_name', {'extern_name': None}),
120 ('extern_name', {'extern_name': None}),
121 ('active', {'active': False}),
121 ('active', {'active': False}),
122 ('active', {'active': True}),
122 ('active', {'active': True}),
123 ('email', {'email': 'some@email.com'}),
123 ('email', {'email': 'some@email.com'}),
124 # ('new_password', {'new_password': 'foobar123',
124 # ('new_password', {'new_password': 'foobar123',
125 # 'password_confirmation': 'foobar123'})
125 # 'password_confirmation': 'foobar123'})
126 ])
126 ])
127 def test_update(self, name, attrs):
127 def test_update(self, name, attrs):
128 self.log_user()
128 self.log_user()
129 usr = fixture.create_user(self.test_user_1, password='qweqwe',
129 usr = fixture.create_user(self.test_user_1, password='qweqwe',
130 email='testme@example.com',
130 email='testme@example.com',
131 extern_type='internal',
131 extern_type='internal',
132 extern_name=self.test_user_1,
132 extern_name=self.test_user_1,
133 skip_if_exists=True)
133 skip_if_exists=True)
134 Session().commit()
134 Session().commit()
135 params = usr.get_api_data(True)
135 params = usr.get_api_data(True)
136 params.update({'password_confirmation': ''})
136 params.update({'password_confirmation': ''})
137 params.update({'new_password': ''})
137 params.update({'new_password': ''})
138 params.update(attrs)
138 params.update(attrs)
139 if name == 'email':
139 if name == 'email':
140 params['emails'] = [attrs['email']]
140 params['emails'] = [attrs['email']]
141 if name == 'extern_type':
141 if name == 'extern_type':
142 #cannot update this via form, expected value is original one
142 #cannot update this via form, expected value is original one
143 params['extern_type'] = "internal"
143 params['extern_type'] = "internal"
144 if name == 'extern_name':
144 if name == 'extern_name':
145 #cannot update this via form, expected value is original one
145 #cannot update this via form, expected value is original one
146 params['extern_name'] = self.test_user_1
146 params['extern_name'] = self.test_user_1
147 # special case since this user is not
147 # special case since this user is not
148 # logged in yet his data is not filled
148 # logged in yet his data is not filled
149 # so we use creation data
149 # so we use creation data
150
150
151 params.update({'_authentication_token': self.authentication_token()})
151 params.update({'_authentication_token': self.authentication_token()})
152 response = self.app.put(url('user', id=usr.user_id), params)
152 response = self.app.put(url('user', id=usr.user_id), params)
153 self.checkSessionFlash(response, 'User updated successfully')
153 self.checkSessionFlash(response, 'User updated successfully')
154 params.pop('_authentication_token')
154 params.pop('_authentication_token')
155
155
156 updated_user = User.get_by_username(self.test_user_1)
156 updated_user = User.get_by_username(self.test_user_1)
157 updated_params = updated_user.get_api_data(True)
157 updated_params = updated_user.get_api_data(True)
158 updated_params.update({'password_confirmation': ''})
158 updated_params.update({'password_confirmation': ''})
159 updated_params.update({'new_password': ''})
159 updated_params.update({'new_password': ''})
160
160
161 self.assertEqual(params, updated_params)
161 self.assertEqual(params, updated_params)
162
162
163 def test_delete(self):
163 def test_delete(self):
164 self.log_user()
164 self.log_user()
165 username = 'newtestuserdeleteme'
165 username = 'newtestuserdeleteme'
166
166
167 fixture.create_user(name=username)
167 fixture.create_user(name=username)
168
168
169 new_user = Session().query(User)\
169 new_user = Session().query(User)\
170 .filter(User.username == username).one()
170 .filter(User.username == username).one()
171 response = self.app.delete(url('user', id=new_user.user_id))
171 response = self.app.delete(url('user', id=new_user.user_id))
172
172
173 self.checkSessionFlash(response, 'Successfully deleted user')
173 self.checkSessionFlash(response, 'Successfully deleted user')
174
174
175 def test_delete_repo_err(self):
175 def test_delete_repo_err(self):
176 self.log_user()
176 self.log_user()
177 username = 'repoerr'
177 username = 'repoerr'
178 reponame = 'repoerr_fail'
178 reponame = 'repoerr_fail'
179
179
180 fixture.create_user(name=username)
180 fixture.create_user(name=username)
181 fixture.create_repo(name=reponame, cur_user=username)
181 fixture.create_repo(name=reponame, cur_user=username)
182
182
183 new_user = Session().query(User)\
183 new_user = Session().query(User)\
184 .filter(User.username == username).one()
184 .filter(User.username == username).one()
185 response = self.app.delete(url('user', id=new_user.user_id))
185 response = self.app.delete(url('user', id=new_user.user_id))
186 self.checkSessionFlash(response, 'User "%s" still '
186 self.checkSessionFlash(response, 'User "%s" still '
187 'owns 1 repositories and cannot be removed. '
187 'owns 1 repositories and cannot be removed. '
188 'Switch owners or remove those repositories: '
188 'Switch owners or remove those repositories: '
189 '%s' % (username, reponame))
189 '%s' % (username, reponame))
190
190
191 response = self.app.delete(url('repo', repo_name=reponame))
191 response = self.app.delete(url('repo', repo_name=reponame))
192 self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
192 self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
193
193
194 response = self.app.delete(url('user', id=new_user.user_id))
194 response = self.app.delete(url('user', id=new_user.user_id))
195 self.checkSessionFlash(response, 'Successfully deleted user')
195 self.checkSessionFlash(response, 'Successfully deleted user')
196
196
197 def test_delete_repo_group_err(self):
197 def test_delete_repo_group_err(self):
198 self.log_user()
198 self.log_user()
199 username = 'repogrouperr'
199 username = 'repogrouperr'
200 groupname = 'repogroup_fail'
200 groupname = 'repogroup_fail'
201
201
202 fixture.create_user(name=username)
202 fixture.create_user(name=username)
203 fixture.create_repo_group(name=groupname, cur_user=username)
203 fixture.create_repo_group(name=groupname, cur_user=username)
204
204
205 new_user = Session().query(User)\
205 new_user = Session().query(User)\
206 .filter(User.username == username).one()
206 .filter(User.username == username).one()
207 response = self.app.delete(url('user', id=new_user.user_id))
207 response = self.app.delete(url('user', id=new_user.user_id))
208 self.checkSessionFlash(response, 'User "%s" still '
208 self.checkSessionFlash(response, 'User "%s" still '
209 'owns 1 repository groups and cannot be removed. '
209 'owns 1 repository groups and cannot be removed. '
210 'Switch owners or remove those repository groups: '
210 'Switch owners or remove those repository groups: '
211 '%s' % (username, groupname))
211 '%s' % (username, groupname))
212
212
213 # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
213 # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
214 # rg = RepoGroup.get_by_group_name(group_name=groupname)
214 # rg = RepoGroup.get_by_group_name(group_name=groupname)
215 # response = self.app.get(url('repos_groups', id=rg.group_id))
215 # response = self.app.get(url('repos_groups', id=rg.group_id))
216
216
217 response = self.app.delete(url('delete_repo_group', group_name=groupname))
217 response = self.app.delete(url('delete_repo_group', group_name=groupname))
218 self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
218 self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
219
219
220 response = self.app.delete(url('user', id=new_user.user_id))
220 response = self.app.delete(url('user', id=new_user.user_id))
221 self.checkSessionFlash(response, 'Successfully deleted user')
221 self.checkSessionFlash(response, 'Successfully deleted user')
222
222
223 def test_delete_user_group_err(self):
223 def test_delete_user_group_err(self):
224 self.log_user()
224 self.log_user()
225 username = 'usergrouperr'
225 username = 'usergrouperr'
226 groupname = 'usergroup_fail'
226 groupname = 'usergroup_fail'
227
227
228 fixture.create_user(name=username)
228 fixture.create_user(name=username)
229 ug = fixture.create_user_group(name=groupname, cur_user=username)
229 ug = fixture.create_user_group(name=groupname, cur_user=username)
230
230
231 new_user = Session().query(User)\
231 new_user = Session().query(User)\
232 .filter(User.username == username).one()
232 .filter(User.username == username).one()
233 response = self.app.delete(url('user', id=new_user.user_id))
233 response = self.app.delete(url('user', id=new_user.user_id))
234 self.checkSessionFlash(response, 'User "%s" still '
234 self.checkSessionFlash(response, 'User "%s" still '
235 'owns 1 user groups and cannot be removed. '
235 'owns 1 user groups and cannot be removed. '
236 'Switch owners or remove those user groups: '
236 'Switch owners or remove those user groups: '
237 '%s' % (username, groupname))
237 '%s' % (username, groupname))
238
238
239 # TODO: why do this fail?
239 # TODO: why do this fail?
240 #response = self.app.delete(url('delete_users_group', id=groupname))
240 #response = self.app.delete(url('delete_users_group', id=groupname))
241 #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
241 #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
242
242
243 fixture.destroy_user_group(ug.users_group_id)
243 fixture.destroy_user_group(ug.users_group_id)
244
244
245 response = self.app.delete(url('user', id=new_user.user_id))
245 response = self.app.delete(url('user', id=new_user.user_id))
246 self.checkSessionFlash(response, 'Successfully deleted user')
246 self.checkSessionFlash(response, 'Successfully deleted user')
247
247
248 def test_show(self):
248 def test_show(self):
249 response = self.app.get(url('user', id=1))
249 response = self.app.get(url('user', id=1))
250
250
251 def test_edit(self):
251 def test_edit(self):
252 self.log_user()
252 self.log_user()
253 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
253 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
254 response = self.app.get(url('edit_user', id=user.user_id))
254 response = self.app.get(url('edit_user', id=user.user_id))
255
255
256 def test_add_perm_create_repo(self):
256 def test_add_perm_create_repo(self):
257 self.log_user()
257 self.log_user()
258 perm_none = Permission.get_by_key('hg.create.none')
258 perm_none = Permission.get_by_key('hg.create.none')
259 perm_create = Permission.get_by_key('hg.create.repository')
259 perm_create = Permission.get_by_key('hg.create.repository')
260
260
261 user = UserModel().create_or_update(username='dummy', password='qwe',
261 user = UserModel().create_or_update(username='dummy', password='qwe',
262 email='dummy', firstname='a',
262 email='dummy', firstname='a',
263 lastname='b')
263 lastname='b')
264 Session().commit()
264 Session().commit()
265 uid = user.user_id
265 uid = user.user_id
266
266
267 try:
267 try:
268 #User should have None permission on creation repository
268 #User should have None permission on creation repository
269 self.assertEqual(UserModel().has_perm(user, perm_none), False)
269 self.assertEqual(UserModel().has_perm(user, perm_none), False)
270 self.assertEqual(UserModel().has_perm(user, perm_create), False)
270 self.assertEqual(UserModel().has_perm(user, perm_create), False)
271
271
272 response = self.app.post(url('edit_user_perms', id=uid),
272 response = self.app.post(url('edit_user_perms', id=uid),
273 params=dict(_method='put',
273 params=dict(_method='put',
274 create_repo_perm=True,
274 create_repo_perm=True,
275 _authentication_token=self.authentication_token()))
275 _authentication_token=self.authentication_token()))
276
276
277 perm_none = Permission.get_by_key('hg.create.none')
277 perm_none = Permission.get_by_key('hg.create.none')
278 perm_create = Permission.get_by_key('hg.create.repository')
278 perm_create = Permission.get_by_key('hg.create.repository')
279
279
280 #User should have None permission on creation repository
280 #User should have None permission on creation repository
281 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
281 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
282 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
282 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
283 finally:
283 finally:
284 UserModel().delete(uid)
284 UserModel().delete(uid)
285 Session().commit()
285 Session().commit()
286
286
287 def test_revoke_perm_create_repo(self):
287 def test_revoke_perm_create_repo(self):
288 self.log_user()
288 self.log_user()
289 perm_none = Permission.get_by_key('hg.create.none')
289 perm_none = Permission.get_by_key('hg.create.none')
290 perm_create = Permission.get_by_key('hg.create.repository')
290 perm_create = Permission.get_by_key('hg.create.repository')
291
291
292 user = UserModel().create_or_update(username='dummy', password='qwe',
292 user = UserModel().create_or_update(username='dummy', password='qwe',
293 email='dummy', firstname='a',
293 email='dummy', firstname='a',
294 lastname='b')
294 lastname='b')
295 Session().commit()
295 Session().commit()
296 uid = user.user_id
296 uid = user.user_id
297
297
298 try:
298 try:
299 #User should have None permission on creation repository
299 #User should have None permission on creation repository
300 self.assertEqual(UserModel().has_perm(user, perm_none), False)
300 self.assertEqual(UserModel().has_perm(user, perm_none), False)
301 self.assertEqual(UserModel().has_perm(user, perm_create), False)
301 self.assertEqual(UserModel().has_perm(user, perm_create), False)
302
302
303 response = self.app.post(url('edit_user_perms', id=uid),
303 response = self.app.post(url('edit_user_perms', id=uid),
304 params=dict(_method='put', _authentication_token=self.authentication_token()))
304 params=dict(_method='put', _authentication_token=self.authentication_token()))
305
305
306 perm_none = Permission.get_by_key('hg.create.none')
306 perm_none = Permission.get_by_key('hg.create.none')
307 perm_create = Permission.get_by_key('hg.create.repository')
307 perm_create = Permission.get_by_key('hg.create.repository')
308
308
309 #User should have None permission on creation repository
309 #User should have None permission on creation repository
310 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
310 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
311 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
311 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
312 finally:
312 finally:
313 UserModel().delete(uid)
313 UserModel().delete(uid)
314 Session().commit()
314 Session().commit()
315
315
316 def test_add_perm_fork_repo(self):
316 def test_add_perm_fork_repo(self):
317 self.log_user()
317 self.log_user()
318 perm_none = Permission.get_by_key('hg.fork.none')
318 perm_none = Permission.get_by_key('hg.fork.none')
319 perm_fork = Permission.get_by_key('hg.fork.repository')
319 perm_fork = Permission.get_by_key('hg.fork.repository')
320
320
321 user = UserModel().create_or_update(username='dummy', password='qwe',
321 user = UserModel().create_or_update(username='dummy', password='qwe',
322 email='dummy', firstname='a',
322 email='dummy', firstname='a',
323 lastname='b')
323 lastname='b')
324 Session().commit()
324 Session().commit()
325 uid = user.user_id
325 uid = user.user_id
326
326
327 try:
327 try:
328 #User should have None permission on creation repository
328 #User should have None permission on creation repository
329 self.assertEqual(UserModel().has_perm(user, perm_none), False)
329 self.assertEqual(UserModel().has_perm(user, perm_none), False)
330 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
330 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
331
331
332 response = self.app.post(url('edit_user_perms', id=uid),
332 response = self.app.post(url('edit_user_perms', id=uid),
333 params=dict(_method='put',
333 params=dict(_method='put',
334 create_repo_perm=True,
334 create_repo_perm=True,
335 _authentication_token=self.authentication_token()))
335 _authentication_token=self.authentication_token()))
336
336
337 perm_none = Permission.get_by_key('hg.create.none')
337 perm_none = Permission.get_by_key('hg.create.none')
338 perm_create = Permission.get_by_key('hg.create.repository')
338 perm_create = Permission.get_by_key('hg.create.repository')
339
339
340 #User should have None permission on creation repository
340 #User should have None permission on creation repository
341 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
341 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
342 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
342 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
343 finally:
343 finally:
344 UserModel().delete(uid)
344 UserModel().delete(uid)
345 Session().commit()
345 Session().commit()
346
346
347 def test_revoke_perm_fork_repo(self):
347 def test_revoke_perm_fork_repo(self):
348 self.log_user()
348 self.log_user()
349 perm_none = Permission.get_by_key('hg.fork.none')
349 perm_none = Permission.get_by_key('hg.fork.none')
350 perm_fork = Permission.get_by_key('hg.fork.repository')
350 perm_fork = Permission.get_by_key('hg.fork.repository')
351
351
352 user = UserModel().create_or_update(username='dummy', password='qwe',
352 user = UserModel().create_or_update(username='dummy', password='qwe',
353 email='dummy', firstname='a',
353 email='dummy', firstname='a',
354 lastname='b')
354 lastname='b')
355 Session().commit()
355 Session().commit()
356 uid = user.user_id
356 uid = user.user_id
357
357
358 try:
358 try:
359 #User should have None permission on creation repository
359 #User should have None permission on creation repository
360 self.assertEqual(UserModel().has_perm(user, perm_none), False)
360 self.assertEqual(UserModel().has_perm(user, perm_none), False)
361 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
361 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
362
362
363 response = self.app.post(url('edit_user_perms', id=uid),
363 response = self.app.post(url('edit_user_perms', id=uid),
364 params=dict(_method='put', _authentication_token=self.authentication_token()))
364 params=dict(_method='put', _authentication_token=self.authentication_token()))
365
365
366 perm_none = Permission.get_by_key('hg.create.none')
366 perm_none = Permission.get_by_key('hg.create.none')
367 perm_create = Permission.get_by_key('hg.create.repository')
367 perm_create = Permission.get_by_key('hg.create.repository')
368
368
369 #User should have None permission on creation repository
369 #User should have None permission on creation repository
370 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
370 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
371 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
371 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
372 finally:
372 finally:
373 UserModel().delete(uid)
373 UserModel().delete(uid)
374 Session().commit()
374 Session().commit()
375
375
376 def test_ips(self):
376 def test_ips(self):
377 self.log_user()
377 self.log_user()
378 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
378 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
379 response = self.app.get(url('edit_user_ips', id=user.user_id))
379 response = self.app.get(url('edit_user_ips', id=user.user_id))
380 response.mustcontain('All IP addresses are allowed')
380 response.mustcontain('All IP addresses are allowed')
381
381
382 @parameterized.expand([
382 @parameterized.expand([
383 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
383 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
384 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
384 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
385 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
385 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
386 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
386 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
387 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
387 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
388 ('127_bad_ip', 'foobar', 'foobar', True),
388 ('127_bad_ip', 'foobar', 'foobar', True),
389 ])
389 ])
390 def test_add_ip(self, test_name, ip, ip_range, failure):
390 def test_add_ip(self, test_name, ip, ip_range, failure):
391 self.log_user()
391 self.log_user()
392 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
392 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
393 user_id = user.user_id
393 user_id = user.user_id
394
394
395 response = self.app.put(url('edit_user_ips', id=user_id),
395 response = self.app.put(url('edit_user_ips', id=user_id),
396 params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
396 params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
397
397
398 if failure:
398 if failure:
399 self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
399 self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
400 response = self.app.get(url('edit_user_ips', id=user_id))
400 response = self.app.get(url('edit_user_ips', id=user_id))
401 response.mustcontain(no=[ip])
401 response.mustcontain(no=[ip])
402 response.mustcontain(no=[ip_range])
402 response.mustcontain(no=[ip_range])
403
403
404 else:
404 else:
405 response = self.app.get(url('edit_user_ips', id=user_id))
405 response = self.app.get(url('edit_user_ips', id=user_id))
406 response.mustcontain(ip)
406 response.mustcontain(ip)
407 response.mustcontain(ip_range)
407 response.mustcontain(ip_range)
408
408
409 ## cleanup
409 ## cleanup
410 for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all():
410 for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all():
411 Session().delete(del_ip)
411 Session().delete(del_ip)
412 Session().commit()
412 Session().commit()
413
413
414 def test_delete_ip(self):
414 def test_delete_ip(self):
415 self.log_user()
415 self.log_user()
416 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
416 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
417 user_id = user.user_id
417 user_id = user.user_id
418 ip = '127.0.0.1/32'
418 ip = '127.0.0.1/32'
419 ip_range = '127.0.0.1 - 127.0.0.1'
419 ip_range = '127.0.0.1 - 127.0.0.1'
420 new_ip = UserModel().add_extra_ip(user_id, ip)
420 new_ip = UserModel().add_extra_ip(user_id, ip)
421 Session().commit()
421 Session().commit()
422 new_ip_id = new_ip.ip_id
422 new_ip_id = new_ip.ip_id
423
423
424 response = self.app.get(url('edit_user_ips', id=user_id))
424 response = self.app.get(url('edit_user_ips', id=user_id))
425 response.mustcontain(ip)
425 response.mustcontain(ip)
426 response.mustcontain(ip_range)
426 response.mustcontain(ip_range)
427
427
428 self.app.post(url('edit_user_ips', id=user_id),
428 self.app.post(url('edit_user_ips', id=user_id),
429 params=dict(_method='delete', del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
429 params=dict(_method='delete', del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
430
430
431 response = self.app.get(url('edit_user_ips', id=user_id))
431 response = self.app.get(url('edit_user_ips', id=user_id))
432 response.mustcontain('All IP addresses are allowed')
432 response.mustcontain('All IP addresses are allowed')
433 response.mustcontain(no=[ip])
433 response.mustcontain(no=[ip])
434 response.mustcontain(no=[ip_range])
434 response.mustcontain(no=[ip_range])
435
435
436 def test_api_keys(self):
436 def test_api_keys(self):
437 self.log_user()
437 self.log_user()
438
438
439 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
439 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
440 response = self.app.get(url('edit_user_api_keys', id=user.user_id))
440 response = self.app.get(url('edit_user_api_keys', id=user.user_id))
441 response.mustcontain(user.api_key)
441 response.mustcontain(user.api_key)
442 response.mustcontain('Expires: Never')
442 response.mustcontain('Expires: Never')
443
443
444 @parameterized.expand([
444 @parameterized.expand([
445 ('forever', -1),
445 ('forever', -1),
446 ('5mins', 60*5),
446 ('5mins', 60*5),
447 ('30days', 60*60*24*30),
447 ('30days', 60*60*24*30),
448 ])
448 ])
449 def test_add_api_keys(self, desc, lifetime):
449 def test_add_api_keys(self, desc, lifetime):
450 self.log_user()
450 self.log_user()
451 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
451 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
452 user_id = user.user_id
452 user_id = user.user_id
453
453
454 response = self.app.post(url('edit_user_api_keys', id=user_id),
454 response = self.app.post(url('edit_user_api_keys', id=user_id),
455 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
455 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
456 self.checkSessionFlash(response, 'API key successfully created')
456 self.checkSessionFlash(response, 'API key successfully created')
457 try:
457 try:
458 response = response.follow()
458 response = response.follow()
459 user = User.get(user_id)
459 user = User.get(user_id)
460 for api_key in user.api_keys:
460 for api_key in user.api_keys:
461 response.mustcontain(api_key)
461 response.mustcontain(api_key)
462 finally:
462 finally:
463 for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
463 for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
464 Session().delete(api_key)
464 Session().delete(api_key)
465 Session().commit()
465 Session().commit()
466
466
467 def test_remove_api_key(self):
467 def test_remove_api_key(self):
468 self.log_user()
468 self.log_user()
469 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
469 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
470 user_id = user.user_id
470 user_id = user.user_id
471
471
472 response = self.app.post(url('edit_user_api_keys', id=user_id),
472 response = self.app.post(url('edit_user_api_keys', id=user_id),
473 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
473 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
474 self.checkSessionFlash(response, 'API key successfully created')
474 self.checkSessionFlash(response, 'API key successfully created')
475 response = response.follow()
475 response = response.follow()
476
476
477 #now delete our key
477 #now delete our key
478 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
478 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
479 self.assertEqual(1, len(keys))
479 self.assertEqual(1, len(keys))
480
480
481 response = self.app.post(url('edit_user_api_keys', id=user_id),
481 response = self.app.post(url('edit_user_api_keys', id=user_id),
482 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
482 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
483 self.checkSessionFlash(response, 'API key successfully deleted')
483 self.checkSessionFlash(response, 'API key successfully deleted')
484 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
484 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
485 self.assertEqual(0, len(keys))
485 self.assertEqual(0, len(keys))
486
486
487 def test_reset_main_api_key(self):
487 def test_reset_main_api_key(self):
488 self.log_user()
488 self.log_user()
489 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
489 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
490 user_id = user.user_id
490 user_id = user.user_id
491 api_key = user.api_key
491 api_key = user.api_key
492 response = self.app.get(url('edit_user_api_keys', id=user_id))
492 response = self.app.get(url('edit_user_api_keys', id=user_id))
493 response.mustcontain(api_key)
493 response.mustcontain(api_key)
494 response.mustcontain('Expires: Never')
494 response.mustcontain('Expires: Never')
495
495
496 response = self.app.post(url('edit_user_api_keys', id=user_id),
496 response = self.app.post(url('edit_user_api_keys', id=user_id),
497 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
497 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
498 self.checkSessionFlash(response, 'API key successfully reset')
498 self.checkSessionFlash(response, 'API key successfully reset')
499 response = response.follow()
499 response = response.follow()
500 response.mustcontain(no=[api_key])
500 response.mustcontain(no=[api_key])
501
501
502 # TODO To be uncommented when pytest is the test runner
502 # TODO To be uncommented when pytest is the test runner
503 #import pytest
503 #import pytest
504 #from kallithea.controllers.admin.users import UsersController
504 #from kallithea.controllers.admin.users import UsersController
505 #class TestAdminUsersController_unittest(object):
505 #class TestAdminUsersController_unittest(object):
506 # """
506 # """
507 # Unit tests for the users controller
507 # Unit tests for the users controller
508 # These are in a separate class, not deriving from TestController (and thus
508 # These are in a separate class, not deriving from TestController (and thus
509 # unittest.TestCase), to be able to benefit from pytest features like
509 # unittest.TestCase), to be able to benefit from pytest features like
510 # monkeypatch.
510 # monkeypatch.
511 # """
511 # """
512 # def test_get_user_or_raise_if_default(self, monkeypatch):
512 # def test_get_user_or_raise_if_default(self, monkeypatch):
513 # # flash complains about an unexisting session
513 # # flash complains about an unexisting session
514 # def flash_mock(*args, **kwargs):
514 # def flash_mock(*args, **kwargs):
515 # pass
515 # pass
516 # monkeypatch.setattr(h, 'flash', flash_mock)
516 # monkeypatch.setattr(h, 'flash', flash_mock)
517 #
517 #
518 # u = UsersController()
518 # u = UsersController()
519 # # a regular user should work correctly
519 # # a regular user should work correctly
520 # user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
520 # user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
521 # assert u._get_user_or_raise_if_default(user.user_id) == user
521 # assert u._get_user_or_raise_if_default(user.user_id) == user
522 # # the default user should raise
522 # # the default user should raise
523 # with pytest.raises(HTTPNotFound):
523 # with pytest.raises(HTTPNotFound):
524 # u._get_user_or_raise_if_default(User.get_default_user().user_id)
524 # u._get_user_or_raise_if_default(User.get_default_user().user_id)
525
525
526
526
527 class TestAdminUsersControllerForDefaultUser(TestController):
527 class TestAdminUsersControllerForDefaultUser(TestController):
528 """
528 """
529 Edit actions on the default user are not allowed.
529 Edit actions on the default user are not allowed.
530 Validate that they throw a 404 exception.
530 Validate that they throw a 404 exception.
531 """
531 """
532 def test_edit_default_user(self):
532 def test_edit_default_user(self):
533 self.log_user()
533 self.log_user()
534 user = User.get_default_user()
534 user = User.get_default_user()
535 response = self.app.get(url('edit_user', id=user.user_id), status=404)
535 response = self.app.get(url('edit_user', id=user.user_id), status=404)
536
536
537 def test_edit_advanced_default_user(self):
537 def test_edit_advanced_default_user(self):
538 self.log_user()
538 self.log_user()
539 user = User.get_default_user()
539 user = User.get_default_user()
540 response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
540 response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
541
541
542 # API keys
542 # API keys
543 def test_edit_api_keys_default_user(self):
543 def test_edit_api_keys_default_user(self):
544 self.log_user()
544 self.log_user()
545 user = User.get_default_user()
545 user = User.get_default_user()
546 response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
546 response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
547
547
548 def test_add_api_keys_default_user(self):
548 def test_add_api_keys_default_user(self):
549 self.log_user()
549 self.log_user()
550 user = User.get_default_user()
550 user = User.get_default_user()
551 response = self.app.post(url('edit_user_api_keys', id=user.user_id),
551 response = self.app.post(url('edit_user_api_keys', id=user.user_id),
552 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
552 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
553
553
554 def test_delete_api_keys_default_user(self):
554 def test_delete_api_keys_default_user(self):
555 self.log_user()
555 self.log_user()
556 user = User.get_default_user()
556 user = User.get_default_user()
557 response = self.app.post(url('edit_user_api_keys', id=user.user_id),
557 response = self.app.post(url('edit_user_api_keys', id=user.user_id),
558 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
558 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
559
559
560 # Permissions
560 # Permissions
561 def test_edit_perms_default_user(self):
561 def test_edit_perms_default_user(self):
562 self.log_user()
562 self.log_user()
563 user = User.get_default_user()
563 user = User.get_default_user()
564 response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
564 response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
565
565
566 def test_update_perms_default_user(self):
567 self.log_user()
568 user = User.get_default_user()
569 response = self.app.post(url('edit_user_perms', id=user.user_id),
570 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
571
566 # E-mails
572 # E-mails
567 def test_edit_emails_default_user(self):
573 def test_edit_emails_default_user(self):
568 self.log_user()
574 self.log_user()
569 user = User.get_default_user()
575 user = User.get_default_user()
570 response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
576 response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
571
577
578 def test_add_emails_default_user(self):
579 self.log_user()
580 user = User.get_default_user()
581 response = self.app.post(url('edit_user_emails', id=user.user_id),
582 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
583
584 def test_delete_emails_default_user(self):
585 self.log_user()
586 user = User.get_default_user()
587 response = self.app.post(url('edit_user_emails', id=user.user_id),
588 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
589
572 # IP addresses
590 # IP addresses
573 # Add/delete of IP addresses for the default user is used to maintain
591 # Add/delete of IP addresses for the default user is used to maintain
574 # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
592 # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
575 def test_edit_ip_default_user(self):
593 def test_edit_ip_default_user(self):
576 self.log_user()
594 self.log_user()
577 user = User.get_default_user()
595 user = User.get_default_user()
578 response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
596 response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
General Comments 0
You need to be logged in to leave comments. Login now