##// END OF EJS Templates
auth-tokens: remove showing builtin tokens
marcink -
r1479:7da2c024 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,468 +1,461 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2013-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 my account controller for RhodeCode admin
24 24 """
25 25
26 26 import logging
27 27 import datetime
28 28
29 29 import formencode
30 30 from formencode import htmlfill
31 31 from pyramid.threadlocal import get_current_registry
32 32 from pylons import request, tmpl_context as c, url, session
33 33 from pylons.controllers.util import redirect
34 34 from pylons.i18n.translation import _
35 35 from sqlalchemy.orm import joinedload
36 36 from webob.exc import HTTPBadGateway
37 37
38 38 from rhodecode import forms
39 39 from rhodecode.lib import helpers as h
40 40 from rhodecode.lib import auth
41 41 from rhodecode.lib.auth import (
42 42 LoginRequired, NotAnonymous, AuthUser, generate_auth_token)
43 43 from rhodecode.lib.base import BaseController, render
44 44 from rhodecode.lib.utils import jsonify
45 45 from rhodecode.lib.utils2 import safe_int, md5, str2bool
46 46 from rhodecode.lib.ext_json import json
47 47 from rhodecode.lib.channelstream import channelstream_request, \
48 48 ChannelstreamException
49 49
50 50 from rhodecode.model.validation_schema.schemas import user_schema
51 51 from rhodecode.model.db import (
52 52 Repository, PullRequest, UserEmailMap, User, UserFollowing)
53 53 from rhodecode.model.forms import UserForm
54 54 from rhodecode.model.scm import RepoList
55 55 from rhodecode.model.user import UserModel
56 56 from rhodecode.model.repo import RepoModel
57 57 from rhodecode.model.auth_token import AuthTokenModel
58 58 from rhodecode.model.meta import Session
59 59 from rhodecode.model.pull_request import PullRequestModel
60 60 from rhodecode.model.comment import CommentsModel
61 61
62 62 log = logging.getLogger(__name__)
63 63
64 64
65 65 class MyAccountController(BaseController):
66 66 """REST Controller styled on the Atom Publishing Protocol"""
67 67 # To properly map this controller, ensure your config/routing.py
68 68 # file has a resource setup:
69 69 # map.resource('setting', 'settings', controller='admin/settings',
70 70 # path_prefix='/admin', name_prefix='admin_')
71 71
72 72 @LoginRequired()
73 73 @NotAnonymous()
74 74 def __before__(self):
75 75 super(MyAccountController, self).__before__()
76 76
77 77 def __load_data(self):
78 78 c.user = User.get(c.rhodecode_user.user_id)
79 79 if c.user.username == User.DEFAULT_USER:
80 80 h.flash(_("You can't edit this user since it's"
81 81 " crucial for entire application"), category='warning')
82 82 return redirect(url('users'))
83 83
84 84 c.auth_user = AuthUser(
85 85 user_id=c.rhodecode_user.user_id, ip_addr=self.ip_addr)
86 86
87 87 def _load_my_repos_data(self, watched=False):
88 88 if watched:
89 89 admin = False
90 90 follows_repos = Session().query(UserFollowing)\
91 91 .filter(UserFollowing.user_id == c.rhodecode_user.user_id)\
92 92 .options(joinedload(UserFollowing.follows_repository))\
93 93 .all()
94 94 repo_list = [x.follows_repository for x in follows_repos]
95 95 else:
96 96 admin = True
97 97 repo_list = Repository.get_all_repos(
98 98 user_id=c.rhodecode_user.user_id)
99 99 repo_list = RepoList(repo_list, perm_set=[
100 100 'repository.read', 'repository.write', 'repository.admin'])
101 101
102 102 repos_data = RepoModel().get_repos_as_dict(
103 103 repo_list=repo_list, admin=admin)
104 104 # json used to render the grid
105 105 return json.dumps(repos_data)
106 106
107 107 @auth.CSRFRequired()
108 108 def my_account_update(self):
109 109 """
110 110 POST /_admin/my_account Updates info of my account
111 111 """
112 112 # url('my_account')
113 113 c.active = 'profile_edit'
114 114 self.__load_data()
115 115 c.perm_user = c.auth_user
116 116 c.extern_type = c.user.extern_type
117 117 c.extern_name = c.user.extern_name
118 118
119 119 defaults = c.user.get_dict()
120 120 update = False
121 121 _form = UserForm(edit=True,
122 122 old_data={'user_id': c.rhodecode_user.user_id,
123 123 'email': c.rhodecode_user.email})()
124 124 form_result = {}
125 125 try:
126 126 post_data = dict(request.POST)
127 127 post_data['new_password'] = ''
128 128 post_data['password_confirmation'] = ''
129 129 form_result = _form.to_python(post_data)
130 130 # skip updating those attrs for my account
131 131 skip_attrs = ['admin', 'active', 'extern_type', 'extern_name',
132 132 'new_password', 'password_confirmation']
133 133 # TODO: plugin should define if username can be updated
134 134 if c.extern_type != "rhodecode":
135 135 # forbid updating username for external accounts
136 136 skip_attrs.append('username')
137 137
138 138 UserModel().update_user(
139 139 c.rhodecode_user.user_id, skip_attrs=skip_attrs, **form_result)
140 140 h.flash(_('Your account was updated successfully'),
141 141 category='success')
142 142 Session().commit()
143 143 update = True
144 144
145 145 except formencode.Invalid as errors:
146 146 return htmlfill.render(
147 147 render('admin/my_account/my_account.mako'),
148 148 defaults=errors.value,
149 149 errors=errors.error_dict or {},
150 150 prefix_error=False,
151 151 encoding="UTF-8",
152 152 force_defaults=False)
153 153 except Exception:
154 154 log.exception("Exception updating user")
155 155 h.flash(_('Error occurred during update of user %s')
156 156 % form_result.get('username'), category='error')
157 157
158 158 if update:
159 159 return redirect('my_account')
160 160
161 161 return htmlfill.render(
162 162 render('admin/my_account/my_account.mako'),
163 163 defaults=defaults,
164 164 encoding="UTF-8",
165 165 force_defaults=False
166 166 )
167 167
168 168 def my_account(self):
169 169 """
170 170 GET /_admin/my_account Displays info about my account
171 171 """
172 172 # url('my_account')
173 173 c.active = 'profile'
174 174 self.__load_data()
175 175
176 176 defaults = c.user.get_dict()
177 177 return htmlfill.render(
178 178 render('admin/my_account/my_account.mako'),
179 179 defaults=defaults, encoding="UTF-8", force_defaults=False)
180 180
181 181 def my_account_edit(self):
182 182 """
183 183 GET /_admin/my_account/edit Displays edit form of my account
184 184 """
185 185 c.active = 'profile_edit'
186 186 self.__load_data()
187 187 c.perm_user = c.auth_user
188 188 c.extern_type = c.user.extern_type
189 189 c.extern_name = c.user.extern_name
190 190
191 191 defaults = c.user.get_dict()
192 192 return htmlfill.render(
193 193 render('admin/my_account/my_account.mako'),
194 194 defaults=defaults,
195 195 encoding="UTF-8",
196 196 force_defaults=False
197 197 )
198 198
199 199 @auth.CSRFRequired(except_methods=['GET'])
200 200 def my_account_password(self):
201 201 c.active = 'password'
202 202 self.__load_data()
203 203 c.extern_type = c.user.extern_type
204 204
205 205 schema = user_schema.ChangePasswordSchema().bind(
206 206 username=c.rhodecode_user.username)
207 207
208 208 form = forms.Form(schema,
209 209 buttons=(forms.buttons.save, forms.buttons.reset))
210 210
211 211 if request.method == 'POST' and c.extern_type == 'rhodecode':
212 212 controls = request.POST.items()
213 213 try:
214 214 valid_data = form.validate(controls)
215 215 UserModel().update_user(c.rhodecode_user.user_id, **valid_data)
216 216 instance = c.rhodecode_user.get_instance()
217 217 instance.update_userdata(force_password_change=False)
218 218 Session().commit()
219 219 except forms.ValidationFailure as e:
220 220 request.session.flash(
221 221 _('Error occurred during update of user password'),
222 222 queue='error')
223 223 form = e
224 224 except Exception:
225 225 log.exception("Exception updating password")
226 226 request.session.flash(
227 227 _('Error occurred during update of user password'),
228 228 queue='error')
229 229 else:
230 230 session.setdefault('rhodecode_user', {}).update(
231 231 {'password': md5(instance.password)})
232 232 session.save()
233 233 request.session.flash(
234 234 _("Successfully updated password"), queue='success')
235 235 return redirect(url('my_account_password'))
236 236
237 237 c.form = form
238 238 return render('admin/my_account/my_account.mako')
239 239
240 240 def my_account_repos(self):
241 241 c.active = 'repos'
242 242 self.__load_data()
243 243
244 244 # json used to render the grid
245 245 c.data = self._load_my_repos_data()
246 246 return render('admin/my_account/my_account.mako')
247 247
248 248 def my_account_watched(self):
249 249 c.active = 'watched'
250 250 self.__load_data()
251 251
252 252 # json used to render the grid
253 253 c.data = self._load_my_repos_data(watched=True)
254 254 return render('admin/my_account/my_account.mako')
255 255
256 256 def my_account_perms(self):
257 257 c.active = 'perms'
258 258 self.__load_data()
259 259 c.perm_user = c.auth_user
260 260
261 261 return render('admin/my_account/my_account.mako')
262 262
263 263 def my_account_emails(self):
264 264 c.active = 'emails'
265 265 self.__load_data()
266 266
267 267 c.user_email_map = UserEmailMap.query()\
268 268 .filter(UserEmailMap.user == c.user).all()
269 269 return render('admin/my_account/my_account.mako')
270 270
271 271 @auth.CSRFRequired()
272 272 def my_account_emails_add(self):
273 273 email = request.POST.get('new_email')
274 274
275 275 try:
276 276 UserModel().add_extra_email(c.rhodecode_user.user_id, email)
277 277 Session().commit()
278 278 h.flash(_("Added new email address `%s` for user account") % email,
279 279 category='success')
280 280 except formencode.Invalid as error:
281 281 msg = error.error_dict['email']
282 282 h.flash(msg, category='error')
283 283 except Exception:
284 284 log.exception("Exception in my_account_emails")
285 285 h.flash(_('An error occurred during email saving'),
286 286 category='error')
287 287 return redirect(url('my_account_emails'))
288 288
289 289 @auth.CSRFRequired()
290 290 def my_account_emails_delete(self):
291 291 email_id = request.POST.get('del_email_id')
292 292 user_model = UserModel()
293 293 user_model.delete_extra_email(c.rhodecode_user.user_id, email_id)
294 294 Session().commit()
295 295 h.flash(_("Removed email address from user account"),
296 296 category='success')
297 297 return redirect(url('my_account_emails'))
298 298
299 299 def _extract_ordering(self, request):
300 300 column_index = safe_int(request.GET.get('order[0][column]'))
301 301 order_dir = request.GET.get('order[0][dir]', 'desc')
302 302 order_by = request.GET.get(
303 303 'columns[%s][data][sort]' % column_index, 'name_raw')
304 304 return order_by, order_dir
305 305
306 306 def _get_pull_requests_list(self, statuses):
307 307 start = safe_int(request.GET.get('start'), 0)
308 308 length = safe_int(request.GET.get('length'), c.visual.dashboard_items)
309 309 order_by, order_dir = self._extract_ordering(request)
310 310
311 311 pull_requests = PullRequestModel().get_im_participating_in(
312 312 user_id=c.rhodecode_user.user_id,
313 313 statuses=statuses,
314 314 offset=start, length=length, order_by=order_by,
315 315 order_dir=order_dir)
316 316
317 317 pull_requests_total_count = PullRequestModel().count_im_participating_in(
318 318 user_id=c.rhodecode_user.user_id, statuses=statuses)
319 319
320 320 from rhodecode.lib.utils import PartialRenderer
321 321 _render = PartialRenderer('data_table/_dt_elements.mako')
322 322 data = []
323 323 for pr in pull_requests:
324 324 repo_id = pr.target_repo_id
325 325 comments = CommentsModel().get_all_comments(
326 326 repo_id, pull_request=pr)
327 327 owned = pr.user_id == c.rhodecode_user.user_id
328 328 status = pr.calculated_review_status()
329 329
330 330 data.append({
331 331 'target_repo': _render('pullrequest_target_repo',
332 332 pr.target_repo.repo_name),
333 333 'name': _render('pullrequest_name',
334 334 pr.pull_request_id, pr.target_repo.repo_name,
335 335 short=True),
336 336 'name_raw': pr.pull_request_id,
337 337 'status': _render('pullrequest_status', status),
338 338 'title': _render(
339 339 'pullrequest_title', pr.title, pr.description),
340 340 'description': h.escape(pr.description),
341 341 'updated_on': _render('pullrequest_updated_on',
342 342 h.datetime_to_time(pr.updated_on)),
343 343 'updated_on_raw': h.datetime_to_time(pr.updated_on),
344 344 'created_on': _render('pullrequest_updated_on',
345 345 h.datetime_to_time(pr.created_on)),
346 346 'created_on_raw': h.datetime_to_time(pr.created_on),
347 347 'author': _render('pullrequest_author',
348 348 pr.author.full_contact, ),
349 349 'author_raw': pr.author.full_name,
350 350 'comments': _render('pullrequest_comments', len(comments)),
351 351 'comments_raw': len(comments),
352 352 'closed': pr.is_closed(),
353 353 'owned': owned
354 354 })
355 355 # json used to render the grid
356 356 data = ({
357 357 'data': data,
358 358 'recordsTotal': pull_requests_total_count,
359 359 'recordsFiltered': pull_requests_total_count,
360 360 })
361 361 return data
362 362
363 363 def my_account_pullrequests(self):
364 364 c.active = 'pullrequests'
365 365 self.__load_data()
366 366 c.show_closed = str2bool(request.GET.get('pr_show_closed'))
367 367
368 368 statuses = [PullRequest.STATUS_NEW, PullRequest.STATUS_OPEN]
369 369 if c.show_closed:
370 370 statuses += [PullRequest.STATUS_CLOSED]
371 371 data = self._get_pull_requests_list(statuses)
372 372 if not request.is_xhr:
373 373 c.data_participate = json.dumps(data['data'])
374 374 c.records_total_participate = data['recordsTotal']
375 375 return render('admin/my_account/my_account.mako')
376 376 else:
377 377 return json.dumps(data)
378 378
379 379 def my_account_auth_tokens(self):
380 380 c.active = 'auth_tokens'
381 381 self.__load_data()
382 382 show_expired = True
383 383 c.lifetime_values = [
384 384 (str(-1), _('forever')),
385 385 (str(5), _('5 minutes')),
386 386 (str(60), _('1 hour')),
387 387 (str(60 * 24), _('1 day')),
388 388 (str(60 * 24 * 30), _('1 month')),
389 389 ]
390 390 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
391 391 c.role_values = [(x, AuthTokenModel.cls._get_role_name(x))
392 392 for x in AuthTokenModel.cls.ROLES]
393 393 c.role_options = [(c.role_values, _("Role"))]
394 394 c.user_auth_tokens = AuthTokenModel().get_auth_tokens(
395 395 c.rhodecode_user.user_id, show_expired=show_expired)
396 396 return render('admin/my_account/my_account.mako')
397 397
398 398 @auth.CSRFRequired()
399 399 def my_account_auth_tokens_add(self):
400 400 lifetime = safe_int(request.POST.get('lifetime'), -1)
401 401 description = request.POST.get('description')
402 402 role = request.POST.get('role')
403 403 AuthTokenModel().create(c.rhodecode_user.user_id, description, lifetime,
404 404 role)
405 405 Session().commit()
406 406 h.flash(_("Auth token successfully created"), category='success')
407 407 return redirect(url('my_account_auth_tokens'))
408 408
409 409 @auth.CSRFRequired()
410 410 def my_account_auth_tokens_delete(self):
411 auth_token = request.POST.get('del_auth_token')
412 user_id = c.rhodecode_user.user_id
413 if request.POST.get('del_auth_token_builtin'):
414 user = User.get(user_id)
415 if user:
416 user.api_key = generate_auth_token(user.username)
417 Session().add(user)
418 Session().commit()
419 h.flash(_("Auth token successfully reset"), category='success')
420 elif auth_token:
421 AuthTokenModel().delete(auth_token, c.rhodecode_user.user_id)
411 del_auth_token = request.POST.get('del_auth_token')
412
413 if del_auth_token:
414 AuthTokenModel().delete(del_auth_token, c.rhodecode_user.user_id)
422 415 Session().commit()
423 416 h.flash(_("Auth token successfully deleted"), category='success')
424 417
425 418 return redirect(url('my_account_auth_tokens'))
426 419
427 420 def my_notifications(self):
428 421 c.active = 'notifications'
429 422 return render('admin/my_account/my_account.mako')
430 423
431 424 @auth.CSRFRequired()
432 425 @jsonify
433 426 def my_notifications_toggle_visibility(self):
434 427 user = c.rhodecode_user.get_instance()
435 428 new_status = not user.user_data.get('notification_status', True)
436 429 user.update_userdata(notification_status=new_status)
437 430 Session().commit()
438 431 return user.user_data['notification_status']
439 432
440 433 @auth.CSRFRequired()
441 434 @jsonify
442 435 def my_account_notifications_test_channelstream(self):
443 436 message = 'Test message sent via Channelstream by user: {}, on {}'.format(
444 437 c.rhodecode_user.username, datetime.datetime.now())
445 438 payload = {
446 439 'type': 'message',
447 440 'timestamp': datetime.datetime.utcnow(),
448 441 'user': 'system',
449 442 #'channel': 'broadcast',
450 443 'pm_users': [c.rhodecode_user.username],
451 444 'message': {
452 445 'message': message,
453 446 'level': 'info',
454 447 'topic': '/notifications'
455 448 }
456 449 }
457 450
458 451 registry = get_current_registry()
459 452 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
460 453 channelstream_config = rhodecode_plugins.get('channelstream', {})
461 454
462 455 try:
463 456 channelstream_request(channelstream_config, [payload], '/message')
464 457 except ChannelstreamException as e:
465 458 log.exception('Failed to send channelstream data')
466 459 return {"response": 'ERROR: {}'.format(e.__class__.__name__)}
467 460 return {"response": 'Channelstream data sent. '
468 461 'You should see a new live message now.'}
@@ -1,748 +1,741 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Users crud controller for pylons
23 23 """
24 24
25 25 import logging
26 26 import formencode
27 27
28 28 from formencode import htmlfill
29 29 from pylons import request, tmpl_context as c, url, config
30 30 from pylons.controllers.util import redirect
31 31 from pylons.i18n.translation import _
32 32
33 33 from rhodecode.authentication.plugins import auth_rhodecode
34 34 from rhodecode.lib.exceptions import (
35 35 DefaultUserException, UserOwnsReposException, UserOwnsRepoGroupsException,
36 36 UserOwnsUserGroupsException, UserCreationError)
37 37 from rhodecode.lib import helpers as h
38 38 from rhodecode.lib import auth
39 39 from rhodecode.lib.auth import (
40 40 LoginRequired, HasPermissionAllDecorator, AuthUser, generate_auth_token)
41 41 from rhodecode.lib.base import BaseController, render
42 42 from rhodecode.model.auth_token import AuthTokenModel
43 43
44 44 from rhodecode.model.db import (
45 45 PullRequestReviewers, User, UserEmailMap, UserIpMap, RepoGroup)
46 46 from rhodecode.model.forms import (
47 47 UserForm, UserPermissionsForm, UserIndividualPermissionsForm)
48 48 from rhodecode.model.repo_group import RepoGroupModel
49 49 from rhodecode.model.user import UserModel
50 50 from rhodecode.model.meta import Session
51 51 from rhodecode.model.permission import PermissionModel
52 52 from rhodecode.lib.utils import action_logger
53 53 from rhodecode.lib.ext_json import json
54 54 from rhodecode.lib.utils2 import datetime_to_time, safe_int, AttributeDict
55 55
56 56 log = logging.getLogger(__name__)
57 57
58 58
59 59 class UsersController(BaseController):
60 60 """REST Controller styled on the Atom Publishing Protocol"""
61 61
62 62 @LoginRequired()
63 63 def __before__(self):
64 64 super(UsersController, self).__before__()
65 65 c.available_permissions = config['available_permissions']
66 66 c.allowed_languages = [
67 67 ('en', 'English (en)'),
68 68 ('de', 'German (de)'),
69 69 ('fr', 'French (fr)'),
70 70 ('it', 'Italian (it)'),
71 71 ('ja', 'Japanese (ja)'),
72 72 ('pl', 'Polish (pl)'),
73 73 ('pt', 'Portuguese (pt)'),
74 74 ('ru', 'Russian (ru)'),
75 75 ('zh', 'Chinese (zh)'),
76 76 ]
77 77 PermissionModel().set_global_permission_choices(c, gettext_translator=_)
78 78
79 79 @HasPermissionAllDecorator('hg.admin')
80 80 def index(self):
81 81 """GET /users: All items in the collection"""
82 82 # url('users')
83 83
84 84 from rhodecode.lib.utils import PartialRenderer
85 85 _render = PartialRenderer('data_table/_dt_elements.mako')
86 86
87 87 def username(user_id, username):
88 88 return _render("user_name", user_id, username)
89 89
90 90 def user_actions(user_id, username):
91 91 return _render("user_actions", user_id, username)
92 92
93 93 # json generate
94 94 c.users_list = User.query()\
95 95 .filter(User.username != User.DEFAULT_USER) \
96 96 .all()
97 97
98 98 users_data = []
99 99 for user in c.users_list:
100 100 users_data.append({
101 101 "username": h.gravatar_with_user(user.username),
102 102 "username_raw": user.username,
103 103 "email": user.email,
104 104 "first_name": h.escape(user.name),
105 105 "last_name": h.escape(user.lastname),
106 106 "last_login": h.format_date(user.last_login),
107 107 "last_login_raw": datetime_to_time(user.last_login),
108 108 "last_activity": h.format_date(
109 109 h.time_to_datetime(user.user_data.get('last_activity', 0))),
110 110 "last_activity_raw": user.user_data.get('last_activity', 0),
111 111 "active": h.bool2icon(user.active),
112 112 "active_raw": user.active,
113 113 "admin": h.bool2icon(user.admin),
114 114 "admin_raw": user.admin,
115 115 "extern_type": user.extern_type,
116 116 "extern_name": user.extern_name,
117 117 "action": user_actions(user.user_id, user.username),
118 118 })
119 119
120 120
121 121 c.data = json.dumps(users_data)
122 122 return render('admin/users/users.mako')
123 123
124 124 def _get_personal_repo_group_template_vars(self):
125 125 DummyUser = AttributeDict({
126 126 'username': '${username}',
127 127 'user_id': '${user_id}',
128 128 })
129 129 c.default_create_repo_group = RepoGroupModel() \
130 130 .get_default_create_personal_repo_group()
131 131 c.personal_repo_group_name = RepoGroupModel() \
132 132 .get_personal_group_name(DummyUser)
133 133
134 134 @HasPermissionAllDecorator('hg.admin')
135 135 @auth.CSRFRequired()
136 136 def create(self):
137 137 """POST /users: Create a new item"""
138 138 # url('users')
139 139 c.default_extern_type = auth_rhodecode.RhodeCodeAuthPlugin.name
140 140 user_model = UserModel()
141 141 user_form = UserForm()()
142 142 try:
143 143 form_result = user_form.to_python(dict(request.POST))
144 144 user = user_model.create(form_result)
145 145 Session().flush()
146 146 username = form_result['username']
147 147 action_logger(c.rhodecode_user, 'admin_created_user:%s' % username,
148 148 None, self.ip_addr, self.sa)
149 149
150 150 user_link = h.link_to(h.escape(username),
151 151 url('edit_user',
152 152 user_id=user.user_id))
153 153 h.flash(h.literal(_('Created user %(user_link)s')
154 154 % {'user_link': user_link}), category='success')
155 155 Session().commit()
156 156 except formencode.Invalid as errors:
157 157 self._get_personal_repo_group_template_vars()
158 158 return htmlfill.render(
159 159 render('admin/users/user_add.mako'),
160 160 defaults=errors.value,
161 161 errors=errors.error_dict or {},
162 162 prefix_error=False,
163 163 encoding="UTF-8",
164 164 force_defaults=False)
165 165 except UserCreationError as e:
166 166 h.flash(e, 'error')
167 167 except Exception:
168 168 log.exception("Exception creation of user")
169 169 h.flash(_('Error occurred during creation of user %s')
170 170 % request.POST.get('username'), category='error')
171 171 return redirect(url('users'))
172 172
173 173 @HasPermissionAllDecorator('hg.admin')
174 174 def new(self):
175 175 """GET /users/new: Form to create a new item"""
176 176 # url('new_user')
177 177 c.default_extern_type = auth_rhodecode.RhodeCodeAuthPlugin.name
178 178 self._get_personal_repo_group_template_vars()
179 179 return render('admin/users/user_add.mako')
180 180
181 181 @HasPermissionAllDecorator('hg.admin')
182 182 @auth.CSRFRequired()
183 183 def update(self, user_id):
184 184 """PUT /users/user_id: Update an existing item"""
185 185 # Forms posted to this method should contain a hidden field:
186 186 # <input type="hidden" name="_method" value="PUT" />
187 187 # Or using helpers:
188 188 # h.form(url('update_user', user_id=ID),
189 189 # method='put')
190 190 # url('user', user_id=ID)
191 191 user_id = safe_int(user_id)
192 192 c.user = User.get_or_404(user_id)
193 193 c.active = 'profile'
194 194 c.extern_type = c.user.extern_type
195 195 c.extern_name = c.user.extern_name
196 196 c.perm_user = AuthUser(user_id=user_id, ip_addr=self.ip_addr)
197 197 available_languages = [x[0] for x in c.allowed_languages]
198 198 _form = UserForm(edit=True, available_languages=available_languages,
199 199 old_data={'user_id': user_id,
200 200 'email': c.user.email})()
201 201 form_result = {}
202 202 try:
203 203 form_result = _form.to_python(dict(request.POST))
204 204 skip_attrs = ['extern_type', 'extern_name']
205 205 # TODO: plugin should define if username can be updated
206 206 if c.extern_type != "rhodecode":
207 207 # forbid updating username for external accounts
208 208 skip_attrs.append('username')
209 209
210 210 UserModel().update_user(user_id, skip_attrs=skip_attrs, **form_result)
211 211 usr = form_result['username']
212 212 action_logger(c.rhodecode_user, 'admin_updated_user:%s' % usr,
213 213 None, self.ip_addr, self.sa)
214 214 h.flash(_('User updated successfully'), category='success')
215 215 Session().commit()
216 216 except formencode.Invalid as errors:
217 217 defaults = errors.value
218 218 e = errors.error_dict or {}
219 219
220 220 return htmlfill.render(
221 221 render('admin/users/user_edit.mako'),
222 222 defaults=defaults,
223 223 errors=e,
224 224 prefix_error=False,
225 225 encoding="UTF-8",
226 226 force_defaults=False)
227 227 except UserCreationError as e:
228 228 h.flash(e, 'error')
229 229 except Exception:
230 230 log.exception("Exception updating user")
231 231 h.flash(_('Error occurred during update of user %s')
232 232 % form_result.get('username'), category='error')
233 233 return redirect(url('edit_user', user_id=user_id))
234 234
235 235 @HasPermissionAllDecorator('hg.admin')
236 236 @auth.CSRFRequired()
237 237 def delete(self, user_id):
238 238 """DELETE /users/user_id: Delete an existing item"""
239 239 # Forms posted to this method should contain a hidden field:
240 240 # <input type="hidden" name="_method" value="DELETE" />
241 241 # Or using helpers:
242 242 # h.form(url('delete_user', user_id=ID),
243 243 # method='delete')
244 244 # url('user', user_id=ID)
245 245 user_id = safe_int(user_id)
246 246 c.user = User.get_or_404(user_id)
247 247
248 248 _repos = c.user.repositories
249 249 _repo_groups = c.user.repository_groups
250 250 _user_groups = c.user.user_groups
251 251
252 252 handle_repos = None
253 253 handle_repo_groups = None
254 254 handle_user_groups = None
255 255 # dummy call for flash of handle
256 256 set_handle_flash_repos = lambda: None
257 257 set_handle_flash_repo_groups = lambda: None
258 258 set_handle_flash_user_groups = lambda: None
259 259
260 260 if _repos and request.POST.get('user_repos'):
261 261 do = request.POST['user_repos']
262 262 if do == 'detach':
263 263 handle_repos = 'detach'
264 264 set_handle_flash_repos = lambda: h.flash(
265 265 _('Detached %s repositories') % len(_repos),
266 266 category='success')
267 267 elif do == 'delete':
268 268 handle_repos = 'delete'
269 269 set_handle_flash_repos = lambda: h.flash(
270 270 _('Deleted %s repositories') % len(_repos),
271 271 category='success')
272 272
273 273 if _repo_groups and request.POST.get('user_repo_groups'):
274 274 do = request.POST['user_repo_groups']
275 275 if do == 'detach':
276 276 handle_repo_groups = 'detach'
277 277 set_handle_flash_repo_groups = lambda: h.flash(
278 278 _('Detached %s repository groups') % len(_repo_groups),
279 279 category='success')
280 280 elif do == 'delete':
281 281 handle_repo_groups = 'delete'
282 282 set_handle_flash_repo_groups = lambda: h.flash(
283 283 _('Deleted %s repository groups') % len(_repo_groups),
284 284 category='success')
285 285
286 286 if _user_groups and request.POST.get('user_user_groups'):
287 287 do = request.POST['user_user_groups']
288 288 if do == 'detach':
289 289 handle_user_groups = 'detach'
290 290 set_handle_flash_user_groups = lambda: h.flash(
291 291 _('Detached %s user groups') % len(_user_groups),
292 292 category='success')
293 293 elif do == 'delete':
294 294 handle_user_groups = 'delete'
295 295 set_handle_flash_user_groups = lambda: h.flash(
296 296 _('Deleted %s user groups') % len(_user_groups),
297 297 category='success')
298 298
299 299 try:
300 300 UserModel().delete(c.user, handle_repos=handle_repos,
301 301 handle_repo_groups=handle_repo_groups,
302 302 handle_user_groups=handle_user_groups)
303 303 Session().commit()
304 304 set_handle_flash_repos()
305 305 set_handle_flash_repo_groups()
306 306 set_handle_flash_user_groups()
307 307 h.flash(_('Successfully deleted user'), category='success')
308 308 except (UserOwnsReposException, UserOwnsRepoGroupsException,
309 309 UserOwnsUserGroupsException, DefaultUserException) as e:
310 310 h.flash(e, category='warning')
311 311 except Exception:
312 312 log.exception("Exception during deletion of user")
313 313 h.flash(_('An error occurred during deletion of user'),
314 314 category='error')
315 315 return redirect(url('users'))
316 316
317 317 @HasPermissionAllDecorator('hg.admin')
318 318 @auth.CSRFRequired()
319 319 def reset_password(self, user_id):
320 320 """
321 321 toggle reset password flag for this user
322 322
323 323 :param user_id:
324 324 """
325 325 user_id = safe_int(user_id)
326 326 c.user = User.get_or_404(user_id)
327 327 try:
328 328 old_value = c.user.user_data.get('force_password_change')
329 329 c.user.update_userdata(force_password_change=not old_value)
330 330 Session().commit()
331 331 if old_value:
332 332 msg = _('Force password change disabled for user')
333 333 else:
334 334 msg = _('Force password change enabled for user')
335 335 h.flash(msg, category='success')
336 336 except Exception:
337 337 log.exception("Exception during password reset for user")
338 338 h.flash(_('An error occurred during password reset for user'),
339 339 category='error')
340 340
341 341 return redirect(url('edit_user_advanced', user_id=user_id))
342 342
343 343 @HasPermissionAllDecorator('hg.admin')
344 344 @auth.CSRFRequired()
345 345 def create_personal_repo_group(self, user_id):
346 346 """
347 347 Create personal repository group for this user
348 348
349 349 :param user_id:
350 350 """
351 351 from rhodecode.model.repo_group import RepoGroupModel
352 352
353 353 user_id = safe_int(user_id)
354 354 c.user = User.get_or_404(user_id)
355 355 personal_repo_group = RepoGroup.get_user_personal_repo_group(
356 356 c.user.user_id)
357 357 if personal_repo_group:
358 358 return redirect(url('edit_user_advanced', user_id=user_id))
359 359
360 360 personal_repo_group_name = RepoGroupModel().get_personal_group_name(
361 361 c.user)
362 362 named_personal_group = RepoGroup.get_by_group_name(
363 363 personal_repo_group_name)
364 364 try:
365 365
366 366 if named_personal_group and named_personal_group.user_id == c.user.user_id:
367 367 # migrate the same named group, and mark it as personal
368 368 named_personal_group.personal = True
369 369 Session().add(named_personal_group)
370 370 Session().commit()
371 371 msg = _('Linked repository group `%s` as personal' % (
372 372 personal_repo_group_name,))
373 373 h.flash(msg, category='success')
374 374 elif not named_personal_group:
375 375 RepoGroupModel().create_personal_repo_group(c.user)
376 376
377 377 msg = _('Created repository group `%s`' % (
378 378 personal_repo_group_name,))
379 379 h.flash(msg, category='success')
380 380 else:
381 381 msg = _('Repository group `%s` is already taken' % (
382 382 personal_repo_group_name,))
383 383 h.flash(msg, category='warning')
384 384 except Exception:
385 385 log.exception("Exception during repository group creation")
386 386 msg = _(
387 387 'An error occurred during repository group creation for user')
388 388 h.flash(msg, category='error')
389 389 Session().rollback()
390 390
391 391 return redirect(url('edit_user_advanced', user_id=user_id))
392 392
393 393 @HasPermissionAllDecorator('hg.admin')
394 394 def show(self, user_id):
395 395 """GET /users/user_id: Show a specific item"""
396 396 # url('user', user_id=ID)
397 397 User.get_or_404(-1)
398 398
399 399 @HasPermissionAllDecorator('hg.admin')
400 400 def edit(self, user_id):
401 401 """GET /users/user_id/edit: Form to edit an existing item"""
402 402 # url('edit_user', user_id=ID)
403 403 user_id = safe_int(user_id)
404 404 c.user = User.get_or_404(user_id)
405 405 if c.user.username == User.DEFAULT_USER:
406 406 h.flash(_("You can't edit this user"), category='warning')
407 407 return redirect(url('users'))
408 408
409 409 c.active = 'profile'
410 410 c.extern_type = c.user.extern_type
411 411 c.extern_name = c.user.extern_name
412 412 c.perm_user = AuthUser(user_id=user_id, ip_addr=self.ip_addr)
413 413
414 414 defaults = c.user.get_dict()
415 415 defaults.update({'language': c.user.user_data.get('language')})
416 416 return htmlfill.render(
417 417 render('admin/users/user_edit.mako'),
418 418 defaults=defaults,
419 419 encoding="UTF-8",
420 420 force_defaults=False)
421 421
422 422 @HasPermissionAllDecorator('hg.admin')
423 423 def edit_advanced(self, user_id):
424 424 user_id = safe_int(user_id)
425 425 user = c.user = User.get_or_404(user_id)
426 426 if user.username == User.DEFAULT_USER:
427 427 h.flash(_("You can't edit this user"), category='warning')
428 428 return redirect(url('users'))
429 429
430 430 c.active = 'advanced'
431 431 c.perm_user = AuthUser(user_id=user_id, ip_addr=self.ip_addr)
432 432 c.personal_repo_group = c.perm_user.personal_repo_group
433 433 c.personal_repo_group_name = RepoGroupModel()\
434 434 .get_personal_group_name(user)
435 435 c.first_admin = User.get_first_super_admin()
436 436 defaults = user.get_dict()
437 437
438 438 # Interim workaround if the user participated on any pull requests as a
439 439 # reviewer.
440 440 has_review = bool(PullRequestReviewers.query().filter(
441 441 PullRequestReviewers.user_id == user_id).first())
442 442 c.can_delete_user = not has_review
443 443 c.can_delete_user_message = _(
444 444 'The user participates as reviewer in pull requests and '
445 445 'cannot be deleted. You can set the user to '
446 446 '"inactive" instead of deleting it.') if has_review else ''
447 447
448 448 return htmlfill.render(
449 449 render('admin/users/user_edit.mako'),
450 450 defaults=defaults,
451 451 encoding="UTF-8",
452 452 force_defaults=False)
453 453
454 454 @HasPermissionAllDecorator('hg.admin')
455 455 def edit_auth_tokens(self, user_id):
456 456 user_id = safe_int(user_id)
457 457 c.user = User.get_or_404(user_id)
458 458 if c.user.username == User.DEFAULT_USER:
459 459 h.flash(_("You can't edit this user"), category='warning')
460 460 return redirect(url('users'))
461 461
462 462 c.active = 'auth_tokens'
463 463 show_expired = True
464 464 c.lifetime_values = [
465 465 (str(-1), _('forever')),
466 466 (str(5), _('5 minutes')),
467 467 (str(60), _('1 hour')),
468 468 (str(60 * 24), _('1 day')),
469 469 (str(60 * 24 * 30), _('1 month')),
470 470 ]
471 471 c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
472 472 c.role_values = [(x, AuthTokenModel.cls._get_role_name(x))
473 473 for x in AuthTokenModel.cls.ROLES]
474 474 c.role_options = [(c.role_values, _("Role"))]
475 475 c.user_auth_tokens = AuthTokenModel().get_auth_tokens(
476 476 c.user.user_id, show_expired=show_expired)
477 477 defaults = c.user.get_dict()
478 478 return htmlfill.render(
479 479 render('admin/users/user_edit.mako'),
480 480 defaults=defaults,
481 481 encoding="UTF-8",
482 482 force_defaults=False)
483 483
484 484 @HasPermissionAllDecorator('hg.admin')
485 485 @auth.CSRFRequired()
486 486 def add_auth_token(self, user_id):
487 487 user_id = safe_int(user_id)
488 488 c.user = User.get_or_404(user_id)
489 489 if c.user.username == User.DEFAULT_USER:
490 490 h.flash(_("You can't edit this user"), category='warning')
491 491 return redirect(url('users'))
492 492
493 493 lifetime = safe_int(request.POST.get('lifetime'), -1)
494 494 description = request.POST.get('description')
495 495 role = request.POST.get('role')
496 496 AuthTokenModel().create(c.user.user_id, description, lifetime, role)
497 497 Session().commit()
498 498 h.flash(_("Auth token successfully created"), category='success')
499 499 return redirect(url('edit_user_auth_tokens', user_id=c.user.user_id))
500 500
501 501 @HasPermissionAllDecorator('hg.admin')
502 502 @auth.CSRFRequired()
503 503 def delete_auth_token(self, user_id):
504 504 user_id = safe_int(user_id)
505 505 c.user = User.get_or_404(user_id)
506 506 if c.user.username == User.DEFAULT_USER:
507 507 h.flash(_("You can't edit this user"), category='warning')
508 508 return redirect(url('users'))
509 509
510 auth_token = request.POST.get('del_auth_token')
511 if request.POST.get('del_auth_token_builtin'):
512 user = User.get(c.user.user_id)
513 if user:
514 user.api_key = generate_auth_token(user.username)
515 Session().add(user)
516 Session().commit()
517 h.flash(_("Auth token successfully reset"), category='success')
518 elif auth_token:
519 AuthTokenModel().delete(auth_token, c.user.user_id)
510 del_auth_token = request.POST.get('del_auth_token')
511 if del_auth_token:
512 AuthTokenModel().delete(del_auth_token, c.user.user_id)
520 513 Session().commit()
521 514 h.flash(_("Auth token successfully deleted"), category='success')
522 515
523 516 return redirect(url('edit_user_auth_tokens', user_id=c.user.user_id))
524 517
525 518 @HasPermissionAllDecorator('hg.admin')
526 519 def edit_global_perms(self, user_id):
527 520 user_id = safe_int(user_id)
528 521 c.user = User.get_or_404(user_id)
529 522 if c.user.username == User.DEFAULT_USER:
530 523 h.flash(_("You can't edit this user"), category='warning')
531 524 return redirect(url('users'))
532 525
533 526 c.active = 'global_perms'
534 527
535 528 c.default_user = User.get_default_user()
536 529 defaults = c.user.get_dict()
537 530 defaults.update(c.default_user.get_default_perms(suffix='_inherited'))
538 531 defaults.update(c.default_user.get_default_perms())
539 532 defaults.update(c.user.get_default_perms())
540 533
541 534 return htmlfill.render(
542 535 render('admin/users/user_edit.mako'),
543 536 defaults=defaults,
544 537 encoding="UTF-8",
545 538 force_defaults=False)
546 539
547 540 @HasPermissionAllDecorator('hg.admin')
548 541 @auth.CSRFRequired()
549 542 def update_global_perms(self, user_id):
550 543 """PUT /users_perm/user_id: Update an existing item"""
551 544 # url('user_perm', user_id=ID, method='put')
552 545 user_id = safe_int(user_id)
553 546 user = User.get_or_404(user_id)
554 547 c.active = 'global_perms'
555 548 try:
556 549 # first stage that verifies the checkbox
557 550 _form = UserIndividualPermissionsForm()
558 551 form_result = _form.to_python(dict(request.POST))
559 552 inherit_perms = form_result['inherit_default_permissions']
560 553 user.inherit_default_permissions = inherit_perms
561 554 Session().add(user)
562 555
563 556 if not inherit_perms:
564 557 # only update the individual ones if we un check the flag
565 558 _form = UserPermissionsForm(
566 559 [x[0] for x in c.repo_create_choices],
567 560 [x[0] for x in c.repo_create_on_write_choices],
568 561 [x[0] for x in c.repo_group_create_choices],
569 562 [x[0] for x in c.user_group_create_choices],
570 563 [x[0] for x in c.fork_choices],
571 564 [x[0] for x in c.inherit_default_permission_choices])()
572 565
573 566 form_result = _form.to_python(dict(request.POST))
574 567 form_result.update({'perm_user_id': user.user_id})
575 568
576 569 PermissionModel().update_user_permissions(form_result)
577 570
578 571 Session().commit()
579 572 h.flash(_('User global permissions updated successfully'),
580 573 category='success')
581 574
582 575 Session().commit()
583 576 except formencode.Invalid as errors:
584 577 defaults = errors.value
585 578 c.user = user
586 579 return htmlfill.render(
587 580 render('admin/users/user_edit.mako'),
588 581 defaults=defaults,
589 582 errors=errors.error_dict or {},
590 583 prefix_error=False,
591 584 encoding="UTF-8",
592 585 force_defaults=False)
593 586 except Exception:
594 587 log.exception("Exception during permissions saving")
595 588 h.flash(_('An error occurred during permissions saving'),
596 589 category='error')
597 590 return redirect(url('edit_user_global_perms', user_id=user_id))
598 591
599 592 @HasPermissionAllDecorator('hg.admin')
600 593 def edit_perms_summary(self, user_id):
601 594 user_id = safe_int(user_id)
602 595 c.user = User.get_or_404(user_id)
603 596 if c.user.username == User.DEFAULT_USER:
604 597 h.flash(_("You can't edit this user"), category='warning')
605 598 return redirect(url('users'))
606 599
607 600 c.active = 'perms_summary'
608 601 c.perm_user = AuthUser(user_id=user_id, ip_addr=self.ip_addr)
609 602
610 603 return render('admin/users/user_edit.mako')
611 604
612 605 @HasPermissionAllDecorator('hg.admin')
613 606 def edit_emails(self, user_id):
614 607 user_id = safe_int(user_id)
615 608 c.user = User.get_or_404(user_id)
616 609 if c.user.username == User.DEFAULT_USER:
617 610 h.flash(_("You can't edit this user"), category='warning')
618 611 return redirect(url('users'))
619 612
620 613 c.active = 'emails'
621 614 c.user_email_map = UserEmailMap.query() \
622 615 .filter(UserEmailMap.user == c.user).all()
623 616
624 617 defaults = c.user.get_dict()
625 618 return htmlfill.render(
626 619 render('admin/users/user_edit.mako'),
627 620 defaults=defaults,
628 621 encoding="UTF-8",
629 622 force_defaults=False)
630 623
631 624 @HasPermissionAllDecorator('hg.admin')
632 625 @auth.CSRFRequired()
633 626 def add_email(self, user_id):
634 627 """POST /user_emails:Add an existing item"""
635 628 # url('user_emails', user_id=ID, method='put')
636 629 user_id = safe_int(user_id)
637 630 c.user = User.get_or_404(user_id)
638 631
639 632 email = request.POST.get('new_email')
640 633 user_model = UserModel()
641 634
642 635 try:
643 636 user_model.add_extra_email(user_id, email)
644 637 Session().commit()
645 638 h.flash(_("Added new email address `%s` for user account") % email,
646 639 category='success')
647 640 except formencode.Invalid as error:
648 641 msg = error.error_dict['email']
649 642 h.flash(msg, category='error')
650 643 except Exception:
651 644 log.exception("Exception during email saving")
652 645 h.flash(_('An error occurred during email saving'),
653 646 category='error')
654 647 return redirect(url('edit_user_emails', user_id=user_id))
655 648
656 649 @HasPermissionAllDecorator('hg.admin')
657 650 @auth.CSRFRequired()
658 651 def delete_email(self, user_id):
659 652 """DELETE /user_emails_delete/user_id: Delete an existing item"""
660 653 # url('user_emails_delete', user_id=ID, method='delete')
661 654 user_id = safe_int(user_id)
662 655 c.user = User.get_or_404(user_id)
663 656 email_id = request.POST.get('del_email_id')
664 657 user_model = UserModel()
665 658 user_model.delete_extra_email(user_id, email_id)
666 659 Session().commit()
667 660 h.flash(_("Removed email address from user account"), category='success')
668 661 return redirect(url('edit_user_emails', user_id=user_id))
669 662
670 663 @HasPermissionAllDecorator('hg.admin')
671 664 def edit_ips(self, user_id):
672 665 user_id = safe_int(user_id)
673 666 c.user = User.get_or_404(user_id)
674 667 if c.user.username == User.DEFAULT_USER:
675 668 h.flash(_("You can't edit this user"), category='warning')
676 669 return redirect(url('users'))
677 670
678 671 c.active = 'ips'
679 672 c.user_ip_map = UserIpMap.query() \
680 673 .filter(UserIpMap.user == c.user).all()
681 674
682 675 c.inherit_default_ips = c.user.inherit_default_permissions
683 676 c.default_user_ip_map = UserIpMap.query() \
684 677 .filter(UserIpMap.user == User.get_default_user()).all()
685 678
686 679 defaults = c.user.get_dict()
687 680 return htmlfill.render(
688 681 render('admin/users/user_edit.mako'),
689 682 defaults=defaults,
690 683 encoding="UTF-8",
691 684 force_defaults=False)
692 685
693 686 @HasPermissionAllDecorator('hg.admin')
694 687 @auth.CSRFRequired()
695 688 def add_ip(self, user_id):
696 689 """POST /user_ips:Add an existing item"""
697 690 # url('user_ips', user_id=ID, method='put')
698 691
699 692 user_id = safe_int(user_id)
700 693 c.user = User.get_or_404(user_id)
701 694 user_model = UserModel()
702 695 try:
703 696 ip_list = user_model.parse_ip_range(request.POST.get('new_ip'))
704 697 except Exception as e:
705 698 ip_list = []
706 699 log.exception("Exception during ip saving")
707 700 h.flash(_('An error occurred during ip saving:%s' % (e,)),
708 701 category='error')
709 702
710 703 desc = request.POST.get('description')
711 704 added = []
712 705 for ip in ip_list:
713 706 try:
714 707 user_model.add_extra_ip(user_id, ip, desc)
715 708 Session().commit()
716 709 added.append(ip)
717 710 except formencode.Invalid as error:
718 711 msg = error.error_dict['ip']
719 712 h.flash(msg, category='error')
720 713 except Exception:
721 714 log.exception("Exception during ip saving")
722 715 h.flash(_('An error occurred during ip saving'),
723 716 category='error')
724 717 if added:
725 718 h.flash(
726 719 _("Added ips %s to user whitelist") % (', '.join(ip_list), ),
727 720 category='success')
728 721 if 'default_user' in request.POST:
729 722 return redirect(url('admin_permissions_ips'))
730 723 return redirect(url('edit_user_ips', user_id=user_id))
731 724
732 725 @HasPermissionAllDecorator('hg.admin')
733 726 @auth.CSRFRequired()
734 727 def delete_ip(self, user_id):
735 728 """DELETE /user_ips_delete/user_id: Delete an existing item"""
736 729 # url('user_ips_delete', user_id=ID, method='delete')
737 730 user_id = safe_int(user_id)
738 731 c.user = User.get_or_404(user_id)
739 732
740 733 ip_id = request.POST.get('del_ip_id')
741 734 user_model = UserModel()
742 735 user_model.delete_extra_ip(user_id, ip_id)
743 736 Session().commit()
744 737 h.flash(_("Removed ip address from user whitelist"), category='success')
745 738
746 739 if 'default_user' in request.POST:
747 740 return redirect(url('admin_permissions_ips'))
748 741 return redirect(url('edit_user_ips', user_id=user_id))
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,106 +1,80 b''
1 1 <div class="panel panel-default">
2 2 <div class="panel-heading">
3 3 <h3 class="panel-title">${_('Authentication Tokens')}</h3>
4 4 </div>
5 5 <div class="panel-body">
6 6 <p>
7 ${_('Built-in tokens can be used to authenticate with all possible options.')}<br/>
8 7 ${_('Each token can have a role. VCS tokens can be used together with the authtoken auth plugin for git/hg/svn operations.')}
9 8 </p>
10 9 <table class="rctable auth_tokens">
11 <tr>
12 <td class="truncate-wrap td-authtoken"><div class="user_auth_tokens truncate autoexpand"><code>${c.user.api_key}</code></div></td>
13 <td class="td-tags">
14 <span class="tag disabled">${_('Built-in')}</span>
15 </td>
16 <td class="td-tags">
17 % for token in c.user.builtin_token_roles:
18 <span class="tag disabled">
19 ${token}
20 </span>
21 % endfor
22 </td>
23 <td class="td-exp">${_('expires')}: ${_('never')}</td>
24 <td class="td-action">
25 ${h.secure_form(url('my_account_auth_tokens'),method='delete')}
26 ${h.hidden('del_auth_token',c.user.api_key)}
27 ${h.hidden('del_auth_token_builtin',1)}
28 <button class="btn-link btn-danger" type="submit"
29 onclick="return confirm('${_('Confirm to reset this auth token: %s') % c.user.api_key}');">
30 <i class="icon-refresh"></i>
31 ${_('Reset')}
32 </button>
33 ${h.end_form()}
34 </td>
35 </tr>
36 10 %if c.user_auth_tokens:
37 11 %for auth_token in c.user_auth_tokens:
38 12 <tr class="${'expired' if auth_token.expired else ''}">
39 13 <td class="truncate-wrap td-authtoken"><div class="user_auth_tokens truncate autoexpand"><code>${auth_token.api_key}</code></div></td>
40 14 <td class="td-wrap">${auth_token.description}</td>
41 15 <td class="td-tags">
42 16 <span class="tag disabled">${auth_token.role_humanized}</span>
43 17 </td>
44 18 <td class="td-exp">
45 19 %if auth_token.expires == -1:
46 20 ${_('expires')}: ${_('never')}
47 21 %else:
48 22 %if auth_token.expired:
49 23 ${_('expired')}: ${h.age_component(h.time_to_utcdatetime(auth_token.expires))}
50 24 %else:
51 25 ${_('expires')}: ${h.age_component(h.time_to_utcdatetime(auth_token.expires))}
52 26 %endif
53 27 %endif
54 28 </td>
55 29 <td class="td-action">
56 30 ${h.secure_form(url('my_account_auth_tokens'),method='delete')}
57 31 ${h.hidden('del_auth_token',auth_token.api_key)}
58 32 <button class="btn btn-link btn-danger" type="submit"
59 33 onclick="return confirm('${_('Confirm to remove this auth token: %s') % auth_token.api_key}');">
60 34 ${_('Delete')}
61 35 </button>
62 36 ${h.end_form()}
63 37 </td>
64 38 </tr>
65 39 %endfor
66 40 %else:
67 41 <tr><td><div class="ip">${_('No additional auth token specified')}</div></td></tr>
68 42 %endif
69 43 </table>
70 44
71 45 <div class="user_auth_tokens">
72 46 ${h.secure_form(url('my_account_auth_tokens'), method='post')}
73 47 <div class="form form-vertical">
74 48 <!-- fields -->
75 49 <div class="fields">
76 50 <div class="field">
77 51 <div class="label">
78 52 <label for="new_email">${_('New authentication token')}:</label>
79 53 </div>
80 54 <div class="input">
81 55 ${h.text('description', placeholder=_('Description'))}
82 56 ${h.select('lifetime', '', c.lifetime_options)}
83 57 ${h.select('role', '', c.role_options)}
84 58 </div>
85 59 </div>
86 60 <div class="buttons">
87 61 ${h.submit('save',_('Add'),class_="btn")}
88 62 ${h.reset('reset',_('Reset'),class_="btn")}
89 63 </div>
90 64 </div>
91 65 </div>
92 66 ${h.end_form()}
93 67 </div>
94 68 </div>
95 69 </div>
96 70 <script>
97 71 $(document).ready(function(){
98 72 var select2Options = {
99 73 'containerCssClass': "drop-menu",
100 74 'dropdownCssClass': "drop-menu-dropdown",
101 75 'dropdownAutoWidth': true
102 76 };
103 77 $("#lifetime").select2(select2Options);
104 78 $("#role").select2(select2Options);
105 79 });
106 80 </script>
@@ -1,107 +1,83 b''
1 1 <div class="panel panel-default">
2 2 <div class="panel-heading">
3 3 <h3 class="panel-title">${_('Authentication Access Tokens')}</h3>
4 4 </div>
5 5 <div class="panel-body">
6 6 <div class="apikeys_wrap">
7 7 <table class="rctable auth_tokens">
8 <tr>
9 <td class="truncate-wrap td-authtoken"><div class="user_auth_tokens truncate autoexpand"><code>${c.user.api_key}</code></div></td>
10 <td class="td-tags">
11 <span class="tag disabled">${_('Built-in')}</span>
12 </td>
13 <td class="td-tags">
14 % for token in c.user.builtin_token_roles:
15 <span class="tag disabled">
16 ${token}
17 </span>
18 % endfor
19 </td>
20 <td class="td-exp">${_('expires')}: ${_('never')}</td>
21 <td class="td-action">
22 ${h.secure_form(url('edit_user_auth_tokens', user_id=c.user.user_id),method='delete')}
23 ${h.hidden('del_auth_token',c.user.api_key)}
24 ${h.hidden('del_auth_token_builtin',1)}
25 <button class="btn btn-link btn-danger" type="submit"
26 onclick="return confirm('${_('Confirm to reset this auth token: %s') % c.user.api_key}');">
27 ${_('Reset')}
28 </button>
29 ${h.end_form()}
30 </td>
31 </tr>
32 8 %if c.user_auth_tokens:
33 9 %for auth_token in c.user_auth_tokens:
34 10 <tr class="${'expired' if auth_token.expired else ''}">
35 11 <td class="truncate-wrap td-authtoken"><div class="user_auth_tokens truncate autoexpand"><code>${auth_token.api_key}</code></div></td>
36 12 <td class="td-wrap">${auth_token.description}</td>
37 13 <td class="td-tags">
38 14 <span class="tag">${auth_token.role_humanized}</span>
39 15 </td>
40 16 <td class="td-exp">
41 17 %if auth_token.expires == -1:
42 18 ${_('expires')}: ${_('never')}
43 19 %else:
44 20 %if auth_token.expired:
45 21 ${_('expired')}: ${h.age_component(h.time_to_utcdatetime(auth_token.expires))}
46 22 %else:
47 23 ${_('expires')}: ${h.age_component(h.time_to_utcdatetime(auth_token.expires))}
48 24 %endif
49 25 %endif
50 26 </td>
51 27 <td>
52 28 ${h.secure_form(url('edit_user_auth_tokens', user_id=c.user.user_id),method='delete')}
53 29 ${h.hidden('del_auth_token',auth_token.api_key)}
54 30 <button class="btn btn-link btn-danger" type="submit"
55 31 onclick="return confirm('${_('Confirm to remove this auth token: %s') % auth_token.api_key}');">
56 32 ${_('Delete')}
57 33 </button>
58 34 ${h.end_form()}
59 35 </td>
60 36 </tr>
61 37 %endfor
62 38 %else:
63 39 <tr><td><div class="ip">${_('No additional auth tokens specified')}</div></td></tr>
64 40 %endif
65 41 </table>
66 42 </div>
67 43
68 44 <div class="user_auth_tokens">
69 45 ${h.secure_form(url('edit_user_auth_tokens', user_id=c.user.user_id), method='put')}
70 46 <div class="form form-vertical">
71 47 <!-- fields -->
72 48 <div class="fields">
73 49 <div class="field">
74 50 <div class="label">
75 51 <label for="new_email">${_('New auth token')}:</label>
76 52 </div>
77 53 <div class="input">
78 54 ${h.text('description', class_='medium', placeholder=_('Description'))}
79 55 ${h.select('lifetime', '', c.lifetime_options)}
80 56 ${h.select('role', '', c.role_options)}
81 57 </div>
82 58 </div>
83 59 <div class="buttons">
84 60 ${h.submit('save',_('Add'),class_="btn btn-small")}
85 61 ${h.reset('reset',_('Reset'),class_="btn btn-small")}
86 62 </div>
87 63 </div>
88 64 </div>
89 65 ${h.end_form()}
90 66 </div>
91 67 </div>
92 68 </div>
93 69
94 70 <script>
95 71 $(document).ready(function(){
96 72 $("#lifetime").select2({
97 73 'containerCssClass': "drop-menu",
98 74 'dropdownCssClass': "drop-menu-dropdown",
99 75 'dropdownAutoWidth': true
100 76 });
101 77 $("#role").select2({
102 78 'containerCssClass': "drop-menu",
103 79 'dropdownCssClass': "drop-menu-dropdown",
104 80 'dropdownAutoWidth': true
105 81 });
106 82 })
107 83 </script>
@@ -1,400 +1,384 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.lib import helpers as h
24 24 from rhodecode.lib.auth import check_password
25 25 from rhodecode.model.db import User, UserFollowing, Repository, UserApiKeys
26 26 from rhodecode.model.meta import Session
27 27 from rhodecode.tests import (
28 28 TestController, url, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL,
29 29 assert_session_flash)
30 30 from rhodecode.tests.fixture import Fixture
31 31 from rhodecode.tests.utils import AssertResponse
32 32
33 33 fixture = Fixture()
34 34
35 35
36 36 class TestMyAccountController(TestController):
37 37 test_user_1 = 'testme'
38 38 test_user_1_password = '0jd83nHNS/d23n'
39 39 destroy_users = set()
40 40
41 41 @classmethod
42 42 def teardown_class(cls):
43 43 fixture.destroy_users(cls.destroy_users)
44 44
45 45 def test_my_account(self):
46 46 self.log_user()
47 47 response = self.app.get(url('my_account'))
48 48
49 49 response.mustcontain('test_admin')
50 50 response.mustcontain('href="/_admin/my_account/edit"')
51 51
52 52 def test_logout_form_contains_csrf(self, autologin_user, csrf_token):
53 53 response = self.app.get(url('my_account'))
54 54 assert_response = AssertResponse(response)
55 55 element = assert_response.get_element('.logout #csrf_token')
56 56 assert element.value == csrf_token
57 57
58 58 def test_my_account_edit(self):
59 59 self.log_user()
60 60 response = self.app.get(url('my_account_edit'))
61 61
62 62 response.mustcontain('value="test_admin')
63 63
64 64 def test_my_account_my_repos(self):
65 65 self.log_user()
66 66 response = self.app.get(url('my_account_repos'))
67 67 repos = Repository.query().filter(
68 68 Repository.user == User.get_by_username(
69 69 TEST_USER_ADMIN_LOGIN)).all()
70 70 for repo in repos:
71 71 response.mustcontain('"name_raw": "%s"' % repo.repo_name)
72 72
73 73 def test_my_account_my_watched(self):
74 74 self.log_user()
75 75 response = self.app.get(url('my_account_watched'))
76 76
77 77 repos = UserFollowing.query().filter(
78 78 UserFollowing.user == User.get_by_username(
79 79 TEST_USER_ADMIN_LOGIN)).all()
80 80 for repo in repos:
81 81 response.mustcontain(
82 82 '"name_raw": "%s"' % repo.follows_repository.repo_name)
83 83
84 84 @pytest.mark.backends("git", "hg")
85 85 def test_my_account_my_pullrequests(self, pr_util):
86 86 self.log_user()
87 87 response = self.app.get(url('my_account_pullrequests'))
88 88 response.mustcontain('There are currently no open pull '
89 89 'requests requiring your participation.')
90 90
91 91 pr = pr_util.create_pull_request(title='TestMyAccountPR')
92 92 response = self.app.get(url('my_account_pullrequests'))
93 93 response.mustcontain('"name_raw": %s' % pr.pull_request_id)
94 94 response.mustcontain('TestMyAccountPR')
95 95
96 96 def test_my_account_my_emails(self):
97 97 self.log_user()
98 98 response = self.app.get(url('my_account_emails'))
99 99 response.mustcontain('No additional emails specified')
100 100
101 101 def test_my_account_my_emails_add_existing_email(self):
102 102 self.log_user()
103 103 response = self.app.get(url('my_account_emails'))
104 104 response.mustcontain('No additional emails specified')
105 105 response = self.app.post(url('my_account_emails'),
106 106 {'new_email': TEST_USER_REGULAR_EMAIL,
107 107 'csrf_token': self.csrf_token})
108 108 assert_session_flash(response, 'This e-mail address is already taken')
109 109
110 110 def test_my_account_my_emails_add_mising_email_in_form(self):
111 111 self.log_user()
112 112 response = self.app.get(url('my_account_emails'))
113 113 response.mustcontain('No additional emails specified')
114 114 response = self.app.post(url('my_account_emails'),
115 115 {'csrf_token': self.csrf_token})
116 116 assert_session_flash(response, 'Please enter an email address')
117 117
118 118 def test_my_account_my_emails_add_remove(self):
119 119 self.log_user()
120 120 response = self.app.get(url('my_account_emails'))
121 121 response.mustcontain('No additional emails specified')
122 122
123 123 response = self.app.post(url('my_account_emails'),
124 124 {'new_email': 'foo@barz.com',
125 125 'csrf_token': self.csrf_token})
126 126
127 127 response = self.app.get(url('my_account_emails'))
128 128
129 129 from rhodecode.model.db import UserEmailMap
130 130 email_id = UserEmailMap.query().filter(
131 131 UserEmailMap.user == User.get_by_username(
132 132 TEST_USER_ADMIN_LOGIN)).filter(
133 133 UserEmailMap.email == 'foo@barz.com').one().email_id
134 134
135 135 response.mustcontain('foo@barz.com')
136 136 response.mustcontain('<input id="del_email_id" name="del_email_id" '
137 137 'type="hidden" value="%s" />' % email_id)
138 138
139 139 response = self.app.post(
140 140 url('my_account_emails'), {
141 141 'del_email_id': email_id, '_method': 'delete',
142 142 'csrf_token': self.csrf_token})
143 143 assert_session_flash(response, 'Removed email address from user account')
144 144 response = self.app.get(url('my_account_emails'))
145 145 response.mustcontain('No additional emails specified')
146 146
147 147 @pytest.mark.parametrize(
148 148 "name, attrs", [
149 149 ('firstname', {'firstname': 'new_username'}),
150 150 ('lastname', {'lastname': 'new_username'}),
151 151 ('admin', {'admin': True}),
152 152 ('admin', {'admin': False}),
153 153 ('extern_type', {'extern_type': 'ldap'}),
154 154 ('extern_type', {'extern_type': None}),
155 155 # ('extern_name', {'extern_name': 'test'}),
156 156 # ('extern_name', {'extern_name': None}),
157 157 ('active', {'active': False}),
158 158 ('active', {'active': True}),
159 159 ('email', {'email': 'some@email.com'}),
160 160 ])
161 161 def test_my_account_update(self, name, attrs):
162 162 usr = fixture.create_user(self.test_user_1,
163 163 password=self.test_user_1_password,
164 164 email='testme@rhodecode.org',
165 165 extern_type='rhodecode',
166 166 extern_name=self.test_user_1,
167 167 skip_if_exists=True)
168 168 self.destroy_users.add(self.test_user_1)
169 169
170 170 params = usr.get_api_data() # current user data
171 171 user_id = usr.user_id
172 172 self.log_user(
173 173 username=self.test_user_1, password=self.test_user_1_password)
174 174
175 175 params.update({'password_confirmation': ''})
176 176 params.update({'new_password': ''})
177 177 params.update({'extern_type': 'rhodecode'})
178 178 params.update({'extern_name': self.test_user_1})
179 179 params.update({'csrf_token': self.csrf_token})
180 180
181 181 params.update(attrs)
182 182 # my account page cannot set language param yet, only for admins
183 183 del params['language']
184 184 response = self.app.post(url('my_account'), params)
185 185
186 186 assert_session_flash(
187 187 response, 'Your account was updated successfully')
188 188
189 189 del params['csrf_token']
190 190
191 191 updated_user = User.get_by_username(self.test_user_1)
192 192 updated_params = updated_user.get_api_data()
193 193 updated_params.update({'password_confirmation': ''})
194 194 updated_params.update({'new_password': ''})
195 195
196 196 params['last_login'] = updated_params['last_login']
197 197 # my account page cannot set language param yet, only for admins
198 198 # but we get this info from API anyway
199 199 params['language'] = updated_params['language']
200 200
201 201 if name == 'email':
202 202 params['emails'] = [attrs['email']]
203 203 if name == 'extern_type':
204 204 # cannot update this via form, expected value is original one
205 205 params['extern_type'] = "rhodecode"
206 206 if name == 'extern_name':
207 207 # cannot update this via form, expected value is original one
208 208 params['extern_name'] = str(user_id)
209 209 if name == 'active':
210 210 # my account cannot deactivate account
211 211 params['active'] = True
212 212 if name == 'admin':
213 213 # my account cannot make you an admin !
214 214 params['admin'] = False
215 215
216 216 assert params == updated_params
217 217
218 218 def test_my_account_update_err_email_exists(self):
219 219 self.log_user()
220 220
221 221 new_email = 'test_regular@mail.com' # already exisitn email
222 222 response = self.app.post(url('my_account'),
223 223 params={
224 224 'username': 'test_admin',
225 225 'new_password': 'test12',
226 226 'password_confirmation': 'test122',
227 227 'firstname': 'NewName',
228 228 'lastname': 'NewLastname',
229 229 'email': new_email,
230 230 'csrf_token': self.csrf_token,
231 231 })
232 232
233 233 response.mustcontain('This e-mail address is already taken')
234 234
235 235 def test_my_account_update_err(self):
236 236 self.log_user('test_regular2', 'test12')
237 237
238 238 new_email = 'newmail.pl'
239 239 response = self.app.post(url('my_account'),
240 240 params={
241 241 'username': 'test_admin',
242 242 'new_password': 'test12',
243 243 'password_confirmation': 'test122',
244 244 'firstname': 'NewName',
245 245 'lastname': 'NewLastname',
246 246 'email': new_email,
247 247 'csrf_token': self.csrf_token,
248 248 })
249 249
250 250 response.mustcontain('An email address must contain a single @')
251 251 from rhodecode.model import validators
252 252 msg = validators.ValidUsername(
253 253 edit=False, old_data={})._messages['username_exists']
254 254 msg = h.html_escape(msg % {'username': 'test_admin'})
255 255 response.mustcontain(u"%s" % msg)
256 256
257 257 def test_my_account_auth_tokens(self):
258 258 usr = self.log_user('test_regular2', 'test12')
259 259 user = User.get(usr['user_id'])
260 260 response = self.app.get(url('my_account_auth_tokens'))
261 261 response.mustcontain(user.api_key)
262 262 response.mustcontain('expires: never')
263 263
264 264 @pytest.mark.parametrize("desc, lifetime", [
265 265 ('forever', -1),
266 266 ('5mins', 60*5),
267 267 ('30days', 60*60*24*30),
268 268 ])
269 269 def test_my_account_add_auth_tokens(self, desc, lifetime):
270 270 usr = self.log_user('test_regular2', 'test12')
271 271 user = User.get(usr['user_id'])
272 272 response = self.app.post(url('my_account_auth_tokens'),
273 273 {'description': desc, 'lifetime': lifetime,
274 274 'csrf_token': self.csrf_token})
275 275 assert_session_flash(response, 'Auth token successfully created')
276 276 try:
277 277 response = response.follow()
278 278 user = User.get(usr['user_id'])
279 279 for auth_token in user.auth_tokens:
280 280 response.mustcontain(auth_token)
281 281 finally:
282 282 for auth_token in UserApiKeys.query().all():
283 283 Session().delete(auth_token)
284 284 Session().commit()
285 285
286 286 def test_my_account_remove_auth_token(self, user_util):
287 287 user = user_util.create_user(password=self.test_user_1_password)
288 288 user_id = user.user_id
289 289 self.log_user(user.username, self.test_user_1_password)
290 290
291 291 user = User.get(user_id)
292 292 keys = user.extra_auth_tokens
293 293 assert 1 == len(keys)
294 294
295 295 response = self.app.post(url('my_account_auth_tokens'),
296 296 {'description': 'desc', 'lifetime': -1,
297 297 'csrf_token': self.csrf_token})
298 298 assert_session_flash(response, 'Auth token successfully created')
299 299 response.follow()
300 300
301 301 user = User.get(user_id)
302 302 keys = user.extra_auth_tokens
303 303 assert 2 == len(keys)
304 304
305 305 response = self.app.post(
306 306 url('my_account_auth_tokens'),
307 307 {'_method': 'delete', 'del_auth_token': keys[0].api_key,
308 308 'csrf_token': self.csrf_token})
309 309 assert_session_flash(response, 'Auth token successfully deleted')
310 310
311 311 user = User.get(user_id)
312 312 keys = user.extra_auth_tokens
313 313 assert 1 == len(keys)
314 314
315 def test_my_account_reset_main_auth_token(self):
316 usr = self.log_user('test_regular2', 'test12')
317 user = User.get(usr['user_id'])
318 api_key = user.api_key
319 response = self.app.get(url('my_account_auth_tokens'))
320 response.mustcontain(api_key)
321 response.mustcontain('expires: never')
322
323 response = self.app.post(
324 url('my_account_auth_tokens'),
325 {'_method': 'delete', 'del_auth_token_builtin': api_key,
326 'csrf_token': self.csrf_token})
327 assert_session_flash(response, 'Auth token successfully reset')
328 response = response.follow()
329 response.mustcontain(no=[api_key])
330
331 315 def test_valid_change_password(self, user_util):
332 316 new_password = 'my_new_valid_password'
333 317 user = user_util.create_user(password=self.test_user_1_password)
334 318 session = self.log_user(user.username, self.test_user_1_password)
335 319 form_data = [
336 320 ('current_password', self.test_user_1_password),
337 321 ('__start__', 'new_password:mapping'),
338 322 ('new_password', new_password),
339 323 ('new_password-confirm', new_password),
340 324 ('__end__', 'new_password:mapping'),
341 325 ('csrf_token', self.csrf_token),
342 326 ]
343 327 response = self.app.post(url('my_account_password'), form_data).follow()
344 328 assert 'Successfully updated password' in response
345 329
346 330 # check_password depends on user being in session
347 331 Session().add(user)
348 332 try:
349 333 assert check_password(new_password, user.password)
350 334 finally:
351 335 Session().expunge(user)
352 336
353 337 @pytest.mark.parametrize('current_pw,new_pw,confirm_pw', [
354 338 ('', 'abcdef123', 'abcdef123'),
355 339 ('wrong_pw', 'abcdef123', 'abcdef123'),
356 340 (test_user_1_password, test_user_1_password, test_user_1_password),
357 341 (test_user_1_password, '', ''),
358 342 (test_user_1_password, 'abcdef123', ''),
359 343 (test_user_1_password, '', 'abcdef123'),
360 344 (test_user_1_password, 'not_the', 'same_pw'),
361 345 (test_user_1_password, 'short', 'short'),
362 346 ])
363 347 def test_invalid_change_password(self, current_pw, new_pw, confirm_pw,
364 348 user_util):
365 349 user = user_util.create_user(password=self.test_user_1_password)
366 350 session = self.log_user(user.username, self.test_user_1_password)
367 351 old_password_hash = session['password']
368 352 form_data = [
369 353 ('current_password', current_pw),
370 354 ('__start__', 'new_password:mapping'),
371 355 ('new_password', new_pw),
372 356 ('new_password-confirm', confirm_pw),
373 357 ('__end__', 'new_password:mapping'),
374 358 ('csrf_token', self.csrf_token),
375 359 ]
376 360 response = self.app.post(url('my_account_password'), form_data)
377 361 assert 'Error occurred' in response
378 362
379 363 def test_password_is_updated_in_session_on_password_change(self, user_util):
380 364 old_password = 'abcdef123'
381 365 new_password = 'abcdef124'
382 366
383 367 user = user_util.create_user(password=old_password)
384 368 session = self.log_user(user.username, old_password)
385 369 old_password_hash = session['password']
386 370
387 371 form_data = [
388 372 ('current_password', old_password),
389 373 ('__start__', 'new_password:mapping'),
390 374 ('new_password', new_password),
391 375 ('new_password-confirm', new_password),
392 376 ('__end__', 'new_password:mapping'),
393 377 ('csrf_token', self.csrf_token),
394 378 ]
395 379 self.app.post(url('my_account_password'), form_data)
396 380
397 381 response = self.app.get(url('home'))
398 382 new_password_hash = response.session['rhodecode_user']['password']
399 383
400 384 assert old_password_hash != new_password_hash
@@ -1,644 +1,627 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 from sqlalchemy.orm.exc import NoResultFound
23 23
24 24 from rhodecode.lib.auth import check_password
25 25 from rhodecode.lib import helpers as h
26 26 from rhodecode.model import validators
27 27 from rhodecode.model.db import User, UserIpMap, UserApiKeys
28 28 from rhodecode.model.meta import Session
29 29 from rhodecode.model.user import UserModel
30 30 from rhodecode.tests import (
31 31 TestController, url, link_to, TEST_USER_ADMIN_LOGIN,
32 32 TEST_USER_REGULAR_LOGIN, assert_session_flash)
33 33 from rhodecode.tests.fixture import Fixture
34 34 from rhodecode.tests.utils import AssertResponse
35 35
36 36 fixture = Fixture()
37 37
38 38
39 39 class TestAdminUsersController(TestController):
40 40 test_user_1 = 'testme'
41 41 destroy_users = set()
42 42
43 43 @classmethod
44 44 def teardown_method(cls, method):
45 45 fixture.destroy_users(cls.destroy_users)
46 46
47 47 def test_index(self):
48 48 self.log_user()
49 49 self.app.get(url('users'))
50 50
51 51 def test_create(self):
52 52 self.log_user()
53 53 username = 'newtestuser'
54 54 password = 'test12'
55 55 password_confirmation = password
56 56 name = 'name'
57 57 lastname = 'lastname'
58 58 email = 'mail@mail.com'
59 59
60 60 response = self.app.get(url('new_user'))
61 61
62 62 response = self.app.post(url('users'), params={
63 63 'username': username,
64 64 'password': password,
65 65 'password_confirmation': password_confirmation,
66 66 'firstname': name,
67 67 'active': True,
68 68 'lastname': lastname,
69 69 'extern_name': 'rhodecode',
70 70 'extern_type': 'rhodecode',
71 71 'email': email,
72 72 'csrf_token': self.csrf_token,
73 73 })
74 74 user_link = link_to(
75 75 username,
76 76 url('edit_user', user_id=User.get_by_username(username).user_id))
77 77 assert_session_flash(response, 'Created user %s' % (user_link,))
78 78 self.destroy_users.add(username)
79 79
80 80 new_user = User.query().filter(User.username == username).one()
81 81
82 82 assert new_user.username == username
83 83 assert check_password(password, new_user.password)
84 84 assert new_user.name == name
85 85 assert new_user.lastname == lastname
86 86 assert new_user.email == email
87 87
88 88 response.follow()
89 89 response = response.follow()
90 90 response.mustcontain(username)
91 91
92 92 def test_create_err(self):
93 93 self.log_user()
94 94 username = 'new_user'
95 95 password = ''
96 96 name = 'name'
97 97 lastname = 'lastname'
98 98 email = 'errmail.com'
99 99
100 100 response = self.app.get(url('new_user'))
101 101
102 102 response = self.app.post(url('users'), params={
103 103 'username': username,
104 104 'password': password,
105 105 'name': name,
106 106 'active': False,
107 107 'lastname': lastname,
108 108 'email': email,
109 109 'csrf_token': self.csrf_token,
110 110 })
111 111
112 112 msg = validators.ValidUsername(
113 113 False, {})._messages['system_invalid_username']
114 114 msg = h.html_escape(msg % {'username': 'new_user'})
115 115 response.mustcontain('<span class="error-message">%s</span>' % msg)
116 116 response.mustcontain(
117 117 '<span class="error-message">Please enter a value</span>')
118 118 response.mustcontain(
119 119 '<span class="error-message">An email address must contain a'
120 120 ' single @</span>')
121 121
122 122 def get_user():
123 123 Session().query(User).filter(User.username == username).one()
124 124
125 125 with pytest.raises(NoResultFound):
126 126 get_user()
127 127
128 128 def test_new(self):
129 129 self.log_user()
130 130 self.app.get(url('new_user'))
131 131
132 132 @pytest.mark.parametrize("name, attrs", [
133 133 ('firstname', {'firstname': 'new_username'}),
134 134 ('lastname', {'lastname': 'new_username'}),
135 135 ('admin', {'admin': True}),
136 136 ('admin', {'admin': False}),
137 137 ('extern_type', {'extern_type': 'ldap'}),
138 138 ('extern_type', {'extern_type': None}),
139 139 ('extern_name', {'extern_name': 'test'}),
140 140 ('extern_name', {'extern_name': None}),
141 141 ('active', {'active': False}),
142 142 ('active', {'active': True}),
143 143 ('email', {'email': 'some@email.com'}),
144 144 ('language', {'language': 'de'}),
145 145 ('language', {'language': 'en'}),
146 146 # ('new_password', {'new_password': 'foobar123',
147 147 # 'password_confirmation': 'foobar123'})
148 148 ])
149 149 def test_update(self, name, attrs):
150 150 self.log_user()
151 151 usr = fixture.create_user(self.test_user_1, password='qweqwe',
152 152 email='testme@rhodecode.org',
153 153 extern_type='rhodecode',
154 154 extern_name=self.test_user_1,
155 155 skip_if_exists=True)
156 156 Session().commit()
157 157 self.destroy_users.add(self.test_user_1)
158 158 params = usr.get_api_data()
159 159 cur_lang = params['language'] or 'en'
160 160 params.update({
161 161 'password_confirmation': '',
162 162 'new_password': '',
163 163 'language': cur_lang,
164 164 '_method': 'put',
165 165 'csrf_token': self.csrf_token,
166 166 })
167 167 params.update({'new_password': ''})
168 168 params.update(attrs)
169 169 if name == 'email':
170 170 params['emails'] = [attrs['email']]
171 171 elif name == 'extern_type':
172 172 # cannot update this via form, expected value is original one
173 173 params['extern_type'] = "rhodecode"
174 174 elif name == 'extern_name':
175 175 # cannot update this via form, expected value is original one
176 176 params['extern_name'] = self.test_user_1
177 177 # special case since this user is not
178 178 # logged in yet his data is not filled
179 179 # so we use creation data
180 180
181 181 response = self.app.post(url('user', user_id=usr.user_id), params)
182 182 assert response.status_int == 302
183 183 assert_session_flash(response, 'User updated successfully')
184 184
185 185 updated_user = User.get_by_username(self.test_user_1)
186 186 updated_params = updated_user.get_api_data()
187 187 updated_params.update({'password_confirmation': ''})
188 188 updated_params.update({'new_password': ''})
189 189
190 190 del params['_method']
191 191 del params['csrf_token']
192 192 assert params == updated_params
193 193
194 194 def test_update_and_migrate_password(
195 195 self, autologin_user, real_crypto_backend):
196 196 from rhodecode.lib import auth
197 197
198 198 # create new user, with sha256 password
199 199 temp_user = 'test_admin_sha256'
200 200 user = fixture.create_user(temp_user)
201 201 user.password = auth._RhodeCodeCryptoSha256().hash_create(
202 202 b'test123')
203 203 Session().add(user)
204 204 Session().commit()
205 205 self.destroy_users.add('test_admin_sha256')
206 206
207 207 params = user.get_api_data()
208 208
209 209 params.update({
210 210 'password_confirmation': 'qweqwe123',
211 211 'new_password': 'qweqwe123',
212 212 'language': 'en',
213 213 '_method': 'put',
214 214 'csrf_token': autologin_user.csrf_token,
215 215 })
216 216
217 217 response = self.app.post(url('user', user_id=user.user_id), params)
218 218 assert response.status_int == 302
219 219 assert_session_flash(response, 'User updated successfully')
220 220
221 221 # new password should be bcrypted, after log-in and transfer
222 222 user = User.get_by_username(temp_user)
223 223 assert user.password.startswith('$')
224 224
225 225 updated_user = User.get_by_username(temp_user)
226 226 updated_params = updated_user.get_api_data()
227 227 updated_params.update({'password_confirmation': 'qweqwe123'})
228 228 updated_params.update({'new_password': 'qweqwe123'})
229 229
230 230 del params['_method']
231 231 del params['csrf_token']
232 232 assert params == updated_params
233 233
234 234 def test_delete(self):
235 235 self.log_user()
236 236 username = 'newtestuserdeleteme'
237 237
238 238 fixture.create_user(name=username)
239 239
240 240 new_user = Session().query(User)\
241 241 .filter(User.username == username).one()
242 242 response = self.app.post(url('user', user_id=new_user.user_id),
243 243 params={'_method': 'delete',
244 244 'csrf_token': self.csrf_token})
245 245
246 246 assert_session_flash(response, 'Successfully deleted user')
247 247
248 248 def test_delete_owner_of_repository(self):
249 249 self.log_user()
250 250 username = 'newtestuserdeleteme_repo_owner'
251 251 obj_name = 'test_repo'
252 252 usr = fixture.create_user(name=username)
253 253 self.destroy_users.add(username)
254 254 fixture.create_repo(obj_name, cur_user=usr.username)
255 255
256 256 new_user = Session().query(User)\
257 257 .filter(User.username == username).one()
258 258 response = self.app.post(url('user', user_id=new_user.user_id),
259 259 params={'_method': 'delete',
260 260 'csrf_token': self.csrf_token})
261 261
262 262 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
263 263 'Switch owners or remove those repositories:%s' % (username,
264 264 obj_name)
265 265 assert_session_flash(response, msg)
266 266 fixture.destroy_repo(obj_name)
267 267
268 268 def test_delete_owner_of_repository_detaching(self):
269 269 self.log_user()
270 270 username = 'newtestuserdeleteme_repo_owner_detach'
271 271 obj_name = 'test_repo'
272 272 usr = fixture.create_user(name=username)
273 273 self.destroy_users.add(username)
274 274 fixture.create_repo(obj_name, cur_user=usr.username)
275 275
276 276 new_user = Session().query(User)\
277 277 .filter(User.username == username).one()
278 278 response = self.app.post(url('user', user_id=new_user.user_id),
279 279 params={'_method': 'delete',
280 280 'user_repos': 'detach',
281 281 'csrf_token': self.csrf_token})
282 282
283 283 msg = 'Detached 1 repositories'
284 284 assert_session_flash(response, msg)
285 285 fixture.destroy_repo(obj_name)
286 286
287 287 def test_delete_owner_of_repository_deleting(self):
288 288 self.log_user()
289 289 username = 'newtestuserdeleteme_repo_owner_delete'
290 290 obj_name = 'test_repo'
291 291 usr = fixture.create_user(name=username)
292 292 self.destroy_users.add(username)
293 293 fixture.create_repo(obj_name, cur_user=usr.username)
294 294
295 295 new_user = Session().query(User)\
296 296 .filter(User.username == username).one()
297 297 response = self.app.post(url('user', user_id=new_user.user_id),
298 298 params={'_method': 'delete',
299 299 'user_repos': 'delete',
300 300 'csrf_token': self.csrf_token})
301 301
302 302 msg = 'Deleted 1 repositories'
303 303 assert_session_flash(response, msg)
304 304
305 305 def test_delete_owner_of_repository_group(self):
306 306 self.log_user()
307 307 username = 'newtestuserdeleteme_repo_group_owner'
308 308 obj_name = 'test_group'
309 309 usr = fixture.create_user(name=username)
310 310 self.destroy_users.add(username)
311 311 fixture.create_repo_group(obj_name, cur_user=usr.username)
312 312
313 313 new_user = Session().query(User)\
314 314 .filter(User.username == username).one()
315 315 response = self.app.post(url('user', user_id=new_user.user_id),
316 316 params={'_method': 'delete',
317 317 'csrf_token': self.csrf_token})
318 318
319 319 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
320 320 'Switch owners or remove those repository groups:%s' % (username,
321 321 obj_name)
322 322 assert_session_flash(response, msg)
323 323 fixture.destroy_repo_group(obj_name)
324 324
325 325 def test_delete_owner_of_repository_group_detaching(self):
326 326 self.log_user()
327 327 username = 'newtestuserdeleteme_repo_group_owner_detach'
328 328 obj_name = 'test_group'
329 329 usr = fixture.create_user(name=username)
330 330 self.destroy_users.add(username)
331 331 fixture.create_repo_group(obj_name, cur_user=usr.username)
332 332
333 333 new_user = Session().query(User)\
334 334 .filter(User.username == username).one()
335 335 response = self.app.post(url('user', user_id=new_user.user_id),
336 336 params={'_method': 'delete',
337 337 'user_repo_groups': 'delete',
338 338 'csrf_token': self.csrf_token})
339 339
340 340 msg = 'Deleted 1 repository groups'
341 341 assert_session_flash(response, msg)
342 342
343 343 def test_delete_owner_of_repository_group_deleting(self):
344 344 self.log_user()
345 345 username = 'newtestuserdeleteme_repo_group_owner_delete'
346 346 obj_name = 'test_group'
347 347 usr = fixture.create_user(name=username)
348 348 self.destroy_users.add(username)
349 349 fixture.create_repo_group(obj_name, cur_user=usr.username)
350 350
351 351 new_user = Session().query(User)\
352 352 .filter(User.username == username).one()
353 353 response = self.app.post(url('user', user_id=new_user.user_id),
354 354 params={'_method': 'delete',
355 355 'user_repo_groups': 'detach',
356 356 'csrf_token': self.csrf_token})
357 357
358 358 msg = 'Detached 1 repository groups'
359 359 assert_session_flash(response, msg)
360 360 fixture.destroy_repo_group(obj_name)
361 361
362 362 def test_delete_owner_of_user_group(self):
363 363 self.log_user()
364 364 username = 'newtestuserdeleteme_user_group_owner'
365 365 obj_name = 'test_user_group'
366 366 usr = fixture.create_user(name=username)
367 367 self.destroy_users.add(username)
368 368 fixture.create_user_group(obj_name, cur_user=usr.username)
369 369
370 370 new_user = Session().query(User)\
371 371 .filter(User.username == username).one()
372 372 response = self.app.post(url('user', user_id=new_user.user_id),
373 373 params={'_method': 'delete',
374 374 'csrf_token': self.csrf_token})
375 375
376 376 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
377 377 'Switch owners or remove those user groups:%s' % (username,
378 378 obj_name)
379 379 assert_session_flash(response, msg)
380 380 fixture.destroy_user_group(obj_name)
381 381
382 382 def test_delete_owner_of_user_group_detaching(self):
383 383 self.log_user()
384 384 username = 'newtestuserdeleteme_user_group_owner_detaching'
385 385 obj_name = 'test_user_group'
386 386 usr = fixture.create_user(name=username)
387 387 self.destroy_users.add(username)
388 388 fixture.create_user_group(obj_name, cur_user=usr.username)
389 389
390 390 new_user = Session().query(User)\
391 391 .filter(User.username == username).one()
392 392 try:
393 393 response = self.app.post(url('user', user_id=new_user.user_id),
394 394 params={'_method': 'delete',
395 395 'user_user_groups': 'detach',
396 396 'csrf_token': self.csrf_token})
397 397
398 398 msg = 'Detached 1 user groups'
399 399 assert_session_flash(response, msg)
400 400 finally:
401 401 fixture.destroy_user_group(obj_name)
402 402
403 403 def test_delete_owner_of_user_group_deleting(self):
404 404 self.log_user()
405 405 username = 'newtestuserdeleteme_user_group_owner_deleting'
406 406 obj_name = 'test_user_group'
407 407 usr = fixture.create_user(name=username)
408 408 self.destroy_users.add(username)
409 409 fixture.create_user_group(obj_name, cur_user=usr.username)
410 410
411 411 new_user = Session().query(User)\
412 412 .filter(User.username == username).one()
413 413 response = self.app.post(url('user', user_id=new_user.user_id),
414 414 params={'_method': 'delete',
415 415 'user_user_groups': 'delete',
416 416 'csrf_token': self.csrf_token})
417 417
418 418 msg = 'Deleted 1 user groups'
419 419 assert_session_flash(response, msg)
420 420
421 421 def test_show(self):
422 422 self.app.get(url('user', user_id=1))
423 423
424 424 def test_edit(self):
425 425 self.log_user()
426 426 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
427 427 self.app.get(url('edit_user', user_id=user.user_id))
428 428
429 429 @pytest.mark.parametrize(
430 430 'repo_create, repo_create_write, user_group_create, repo_group_create,'
431 431 'fork_create, inherit_default_permissions, expect_error,'
432 432 'expect_form_error', [
433 433 ('hg.create.none', 'hg.create.write_on_repogroup.false',
434 434 'hg.usergroup.create.false', 'hg.repogroup.create.false',
435 435 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
436 436 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
437 437 'hg.usergroup.create.false', 'hg.repogroup.create.false',
438 438 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
439 439 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
440 440 'hg.usergroup.create.true', 'hg.repogroup.create.true',
441 441 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
442 442 False),
443 443 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
444 444 'hg.usergroup.create.true', 'hg.repogroup.create.true',
445 445 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
446 446 True),
447 447 ('', '', '', '', '', '', True, False),
448 448 ])
449 449 def test_global_perms_on_user(
450 450 self, repo_create, repo_create_write, user_group_create,
451 451 repo_group_create, fork_create, expect_error, expect_form_error,
452 452 inherit_default_permissions):
453 453 self.log_user()
454 454 user = fixture.create_user('dummy')
455 455 uid = user.user_id
456 456
457 457 # ENABLE REPO CREATE ON A GROUP
458 458 perm_params = {
459 459 'inherit_default_permissions': False,
460 460 'default_repo_create': repo_create,
461 461 'default_repo_create_on_write': repo_create_write,
462 462 'default_user_group_create': user_group_create,
463 463 'default_repo_group_create': repo_group_create,
464 464 'default_fork_create': fork_create,
465 465 'default_inherit_default_permissions': inherit_default_permissions,
466 466 '_method': 'put',
467 467 'csrf_token': self.csrf_token,
468 468 }
469 469 response = self.app.post(
470 470 url('edit_user_global_perms', user_id=uid),
471 471 params=perm_params)
472 472
473 473 if expect_form_error:
474 474 assert response.status_int == 200
475 475 response.mustcontain('Value must be one of')
476 476 else:
477 477 if expect_error:
478 478 msg = 'An error occurred during permissions saving'
479 479 else:
480 480 msg = 'User global permissions updated successfully'
481 481 ug = User.get(uid)
482 482 del perm_params['_method']
483 483 del perm_params['inherit_default_permissions']
484 484 del perm_params['csrf_token']
485 485 assert perm_params == ug.get_default_perms()
486 486 assert_session_flash(response, msg)
487 487 fixture.destroy_user(uid)
488 488
489 489 def test_global_permissions_initial_values(self, user_util):
490 490 self.log_user()
491 491 user = user_util.create_user()
492 492 uid = user.user_id
493 493 response = self.app.get(url('edit_user_global_perms', user_id=uid))
494 494 default_user = User.get_default_user()
495 495 default_permissions = default_user.get_default_perms()
496 496 assert_response = AssertResponse(response)
497 497 expected_permissions = (
498 498 'default_repo_create', 'default_repo_create_on_write',
499 499 'default_fork_create', 'default_repo_group_create',
500 500 'default_user_group_create', 'default_inherit_default_permissions')
501 501 for permission in expected_permissions:
502 502 css_selector = '[name={}][checked=checked]'.format(permission)
503 503 element = assert_response.get_element(css_selector)
504 504 assert element.value == default_permissions[permission]
505 505
506 506 def test_ips(self):
507 507 self.log_user()
508 508 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
509 509 response = self.app.get(url('edit_user_ips', user_id=user.user_id))
510 510 response.mustcontain('All IP addresses are allowed')
511 511
512 512 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
513 513 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
514 514 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
515 515 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
516 516 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
517 517 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
518 518 ('127_bad_ip', 'foobar', 'foobar', True),
519 519 ])
520 520 def test_add_ip(self, test_name, ip, ip_range, failure):
521 521 self.log_user()
522 522 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
523 523 user_id = user.user_id
524 524
525 525 response = self.app.post(url('edit_user_ips', user_id=user_id),
526 526 params={'new_ip': ip, '_method': 'put',
527 527 'csrf_token': self.csrf_token})
528 528
529 529 if failure:
530 530 assert_session_flash(
531 531 response, 'Please enter a valid IPv4 or IpV6 address')
532 532 response = self.app.get(url('edit_user_ips', user_id=user_id))
533 533 response.mustcontain(no=[ip])
534 534 response.mustcontain(no=[ip_range])
535 535
536 536 else:
537 537 response = self.app.get(url('edit_user_ips', user_id=user_id))
538 538 response.mustcontain(ip)
539 539 response.mustcontain(ip_range)
540 540
541 541 # cleanup
542 542 for del_ip in UserIpMap.query().filter(
543 543 UserIpMap.user_id == user_id).all():
544 544 Session().delete(del_ip)
545 545 Session().commit()
546 546
547 547 def test_delete_ip(self):
548 548 self.log_user()
549 549 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
550 550 user_id = user.user_id
551 551 ip = '127.0.0.1/32'
552 552 ip_range = '127.0.0.1 - 127.0.0.1'
553 553 new_ip = UserModel().add_extra_ip(user_id, ip)
554 554 Session().commit()
555 555 new_ip_id = new_ip.ip_id
556 556
557 557 response = self.app.get(url('edit_user_ips', user_id=user_id))
558 558 response.mustcontain(ip)
559 559 response.mustcontain(ip_range)
560 560
561 561 self.app.post(url('edit_user_ips', user_id=user_id),
562 562 params={'_method': 'delete', 'del_ip_id': new_ip_id,
563 563 'csrf_token': self.csrf_token})
564 564
565 565 response = self.app.get(url('edit_user_ips', user_id=user_id))
566 566 response.mustcontain('All IP addresses are allowed')
567 567 response.mustcontain(no=[ip])
568 568 response.mustcontain(no=[ip_range])
569 569
570 570 def test_auth_tokens(self):
571 571 self.log_user()
572 572
573 573 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
574 574 response = self.app.get(
575 575 url('edit_user_auth_tokens', user_id=user.user_id))
576 576 response.mustcontain(user.api_key)
577 577 response.mustcontain('expires: never')
578 578
579 579 @pytest.mark.parametrize("desc, lifetime", [
580 580 ('forever', -1),
581 581 ('5mins', 60*5),
582 582 ('30days', 60*60*24*30),
583 583 ])
584 584 def test_add_auth_token(self, desc, lifetime):
585 585 self.log_user()
586 586 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
587 587 user_id = user.user_id
588 588
589 589 response = self.app.post(
590 590 url('edit_user_auth_tokens', user_id=user_id),
591 591 {'_method': 'put', 'description': desc, 'lifetime': lifetime,
592 592 'csrf_token': self.csrf_token})
593 593 assert_session_flash(response, 'Auth token successfully created')
594 594 try:
595 595 response = response.follow()
596 596 user = User.get(user_id)
597 597 for auth_token in user.auth_tokens:
598 598 response.mustcontain(auth_token)
599 599 finally:
600 600 for api_key in UserApiKeys.query().filter(
601 601 UserApiKeys.user_id == user_id).all():
602 602 Session().delete(api_key)
603 603 Session().commit()
604 604
605 605 def test_remove_auth_token(self):
606 606 self.log_user()
607 607 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
608 608 user_id = user.user_id
609 609
610 610 response = self.app.post(
611 611 url('edit_user_auth_tokens', user_id=user_id),
612 612 {'_method': 'put', 'description': 'desc', 'lifetime': -1,
613 613 'csrf_token': self.csrf_token})
614 614 assert_session_flash(response, 'Auth token successfully created')
615 615 response = response.follow()
616 616
617 617 # now delete our key
618 618 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
619 619 assert 1 == len(keys)
620 620
621 621 response = self.app.post(
622 622 url('edit_user_auth_tokens', user_id=user_id),
623 623 {'_method': 'delete', 'del_auth_token': keys[0].api_key,
624 624 'csrf_token': self.csrf_token})
625 625 assert_session_flash(response, 'Auth token successfully deleted')
626 626 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
627 627 assert 0 == len(keys)
628
629 def test_reset_main_auth_token(self):
630 self.log_user()
631 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
632 user_id = user.user_id
633 api_key = user.api_key
634 response = self.app.get(url('edit_user_auth_tokens', user_id=user_id))
635 response.mustcontain(api_key)
636 response.mustcontain('expires: never')
637
638 response = self.app.post(
639 url('edit_user_auth_tokens', user_id=user_id),
640 {'_method': 'delete', 'del_auth_token_builtin': api_key,
641 'csrf_token': self.csrf_token})
642 assert_session_flash(response, 'Auth token successfully reset')
643 response = response.follow()
644 response.mustcontain(no=[api_key])
General Comments 0
You need to be logged in to leave comments. Login now