##// END OF EJS Templates
fix(2fa): fixed typo from refactor
super-admin -
r5375:7ec0fbd3 default
parent child Browse files
Show More
@@ -1,553 +1,553 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import time
20 20 import json
21 21 import pyotp
22 22 import qrcode
23 23 import collections
24 24 import datetime
25 25 import formencode
26 26 import formencode.htmlfill
27 27 import logging
28 28 import urllib.parse
29 29 import requests
30 30 from io import BytesIO
31 31 from base64 import b64encode
32 32
33 33 from pyramid.renderers import render
34 34 from pyramid.response import Response
35 35 from pyramid.httpexceptions import HTTPFound
36 36
37 37 import rhodecode
38 38 from rhodecode.apps._base import BaseAppView
39 39 from rhodecode.authentication.base import authenticate, HTTP_TYPE
40 40 from rhodecode.authentication.plugins import auth_rhodecode
41 41 from rhodecode.events import UserRegistered, trigger
42 42 from rhodecode.lib import helpers as h
43 43 from rhodecode.lib import audit_logger
44 44 from rhodecode.lib.auth import (
45 45 AuthUser, HasPermissionAnyDecorator, CSRFRequired, LoginRequired, NotAnonymous)
46 46 from rhodecode.lib.base import get_ip_addr
47 47 from rhodecode.lib.exceptions import UserCreationError
48 48 from rhodecode.lib.utils2 import safe_str
49 49 from rhodecode.model.db import User, UserApiKeys
50 50 from rhodecode.model.forms import LoginForm, RegisterForm, PasswordResetForm, TOTPForm
51 51 from rhodecode.model.meta import Session
52 52 from rhodecode.model.auth_token import AuthTokenModel
53 53 from rhodecode.model.settings import SettingsModel
54 54 from rhodecode.model.user import UserModel
55 55 from rhodecode.translation import _
56 56
57 57
58 58 log = logging.getLogger(__name__)
59 59
60 60 CaptchaData = collections.namedtuple(
61 61 'CaptchaData', 'active, private_key, public_key')
62 62
63 63
64 64 def store_user_in_session(session, user_identifier, remember=False):
65 65 user = User.get_by_username_or_primary_email(user_identifier)
66 66 auth_user = AuthUser(user.user_id)
67 67 auth_user.set_authenticated()
68 68 cs = auth_user.get_cookie_store()
69 69 session['rhodecode_user'] = cs
70 70 user.update_lastlogin()
71 71 Session().commit()
72 72
73 73 # If they want to be remembered, update the cookie
74 74 if remember:
75 75 _year = (datetime.datetime.now() +
76 76 datetime.timedelta(seconds=60 * 60 * 24 * 365))
77 77 session._set_cookie_expires(_year)
78 78
79 79 session.save()
80 80
81 81 safe_cs = cs.copy()
82 82 safe_cs['password'] = '****'
83 83 log.info('user %s is now authenticated and stored in '
84 84 'session, session attrs %s', user_identifier, safe_cs)
85 85
86 86 # dumps session attrs back to cookie
87 87 session._update_cookie_out()
88 88 # we set new cookie
89 89 headers = None
90 90 if session.request['set_cookie']:
91 91 # send set-cookie headers back to response to update cookie
92 92 headers = [('Set-Cookie', session.request['cookie_out'])]
93 93 return headers
94 94
95 95
96 96 def get_came_from(request):
97 97 came_from = safe_str(request.GET.get('came_from', ''))
98 98 parsed = urllib.parse.urlparse(came_from)
99 99
100 100 allowed_schemes = ['http', 'https']
101 101 default_came_from = h.route_path('home')
102 102 if parsed.scheme and parsed.scheme not in allowed_schemes:
103 103 log.error('Suspicious URL scheme detected %s for url %s',
104 104 parsed.scheme, parsed)
105 105 came_from = default_came_from
106 106 elif parsed.netloc and request.host != parsed.netloc:
107 107 log.error('Suspicious NETLOC detected %s for url %s server url '
108 108 'is: %s', parsed.netloc, parsed, request.host)
109 109 came_from = default_came_from
110 110 elif any(bad_char in came_from for bad_char in ('\r', '\n')):
111 111 log.error('Header injection detected `%s` for url %s server url ',
112 112 parsed.path, parsed)
113 113 came_from = default_came_from
114 114
115 115 return came_from or default_came_from
116 116
117 117
118 118 class LoginView(BaseAppView):
119 119
120 120 def load_default_context(self):
121 121 c = self._get_local_tmpl_context()
122 122 c.came_from = get_came_from(self.request)
123 123 return c
124 124
125 125 def _get_captcha_data(self):
126 126 settings = SettingsModel().get_all_settings()
127 127 private_key = settings.get('rhodecode_captcha_private_key')
128 128 public_key = settings.get('rhodecode_captcha_public_key')
129 129 active = bool(private_key)
130 130 return CaptchaData(
131 131 active=active, private_key=private_key, public_key=public_key)
132 132
133 133 def validate_captcha(self, private_key):
134 134
135 135 captcha_rs = self.request.POST.get('g-recaptcha-response')
136 136 url = "https://www.google.com/recaptcha/api/siteverify"
137 137 params = {
138 138 'secret': private_key,
139 139 'response': captcha_rs,
140 140 'remoteip': get_ip_addr(self.request.environ)
141 141 }
142 142 verify_rs = requests.get(url, params=params, verify=True, timeout=60)
143 143 verify_rs = verify_rs.json()
144 144 captcha_status = verify_rs.get('success', False)
145 145 captcha_errors = verify_rs.get('error-codes', [])
146 146 if not isinstance(captcha_errors, list):
147 147 captcha_errors = [captcha_errors]
148 148 captcha_errors = ', '.join(captcha_errors)
149 149 captcha_message = ''
150 150 if captcha_status is False:
151 151 captcha_message = "Bad captcha. Errors: {}".format(
152 152 captcha_errors)
153 153
154 154 return captcha_status, captcha_message
155 155
156 156 def login(self):
157 157 c = self.load_default_context()
158 158 auth_user = self._rhodecode_user
159 159
160 160 # redirect if already logged in
161 161 if (auth_user.is_authenticated and
162 162 not auth_user.is_default and auth_user.ip_allowed):
163 163 raise HTTPFound(c.came_from)
164 164
165 165 # check if we use headers plugin, and try to login using it.
166 166 try:
167 167 log.debug('Running PRE-AUTH for headers based authentication')
168 168 auth_info = authenticate(
169 169 '', '', self.request.environ, HTTP_TYPE, skip_missing=True)
170 170 if auth_info:
171 171 headers = store_user_in_session(
172 172 self.session, auth_info.get('username'))
173 173 raise HTTPFound(c.came_from, headers=headers)
174 174 except UserCreationError as e:
175 175 log.error(e)
176 176 h.flash(e, category='error')
177 177
178 178 return self._get_template_context(c)
179 179
180 180 def login_post(self):
181 181 c = self.load_default_context()
182 182
183 183 login_form = LoginForm(self.request.translate)()
184 184
185 185 try:
186 186 self.session.invalidate()
187 187 form_result = login_form.to_python(self.request.POST)
188 188 # form checks for username/password, now we're authenticated
189 189 username = form_result['username']
190 190 if (user := User.get_by_username_or_primary_email(username)).has_enabled_2fa:
191 191 user.check_2fa_required = True
192 192
193 193 headers = store_user_in_session(
194 194 self.session,
195 195 user_identifier=username,
196 196 remember=form_result['remember'])
197 197 log.debug('Redirecting to "%s" after login.', c.came_from)
198 198
199 199 audit_user = audit_logger.UserWrap(
200 200 username=self.request.POST.get('username'),
201 201 ip_addr=self.request.remote_addr)
202 202 action_data = {'user_agent': self.request.user_agent}
203 203 audit_logger.store_web(
204 204 'user.login.success', action_data=action_data,
205 205 user=audit_user, commit=True)
206 206
207 207 raise HTTPFound(c.came_from, headers=headers)
208 208 except formencode.Invalid as errors:
209 209 defaults = errors.value
210 210 # remove password from filling in form again
211 211 defaults.pop('password', None)
212 212 render_ctx = {
213 213 'errors': errors.error_dict,
214 214 'defaults': defaults,
215 215 }
216 216
217 217 audit_user = audit_logger.UserWrap(
218 218 username=self.request.POST.get('username'),
219 219 ip_addr=self.request.remote_addr)
220 220 action_data = {'user_agent': self.request.user_agent}
221 221 audit_logger.store_web(
222 222 'user.login.failure', action_data=action_data,
223 223 user=audit_user, commit=True)
224 224 return self._get_template_context(c, **render_ctx)
225 225
226 226 except UserCreationError as e:
227 227 # headers auth or other auth functions that create users on
228 228 # the fly can throw this exception signaling that there's issue
229 229 # with user creation, explanation should be provided in
230 230 # Exception itself
231 231 h.flash(e, category='error')
232 232 return self._get_template_context(c)
233 233
234 234 @CSRFRequired()
235 235 def logout(self):
236 236 auth_user = self._rhodecode_user
237 237 log.info('Deleting session for user: `%s`', auth_user)
238 238
239 239 action_data = {'user_agent': self.request.user_agent}
240 240 audit_logger.store_web(
241 241 'user.logout', action_data=action_data,
242 242 user=auth_user, commit=True)
243 243 self.session.delete()
244 244 return HTTPFound(h.route_path('home'))
245 245
246 246 @HasPermissionAnyDecorator(
247 247 'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate')
248 248 def register(self, defaults=None, errors=None):
249 249 c = self.load_default_context()
250 250 defaults = defaults or {}
251 251 errors = errors or {}
252 252
253 253 settings = SettingsModel().get_all_settings()
254 254 register_message = settings.get('rhodecode_register_message') or ''
255 255 captcha = self._get_captcha_data()
256 256 auto_active = 'hg.register.auto_activate' in User.get_default_user()\
257 257 .AuthUser().permissions['global']
258 258
259 259 render_ctx = self._get_template_context(c)
260 260 render_ctx.update({
261 261 'defaults': defaults,
262 262 'errors': errors,
263 263 'auto_active': auto_active,
264 264 'captcha_active': captcha.active,
265 265 'captcha_public_key': captcha.public_key,
266 266 'register_message': register_message,
267 267 })
268 268 return render_ctx
269 269
270 270 @HasPermissionAnyDecorator(
271 271 'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate')
272 272 def register_post(self):
273 273 from rhodecode.authentication.plugins import auth_rhodecode
274 274
275 275 self.load_default_context()
276 276 captcha = self._get_captcha_data()
277 277 auto_active = 'hg.register.auto_activate' in User.get_default_user()\
278 278 .AuthUser().permissions['global']
279 279
280 280 extern_name = auth_rhodecode.RhodeCodeAuthPlugin.uid
281 281 extern_type = auth_rhodecode.RhodeCodeAuthPlugin.uid
282 282
283 283 register_form = RegisterForm(self.request.translate)()
284 284 try:
285 285
286 286 form_result = register_form.to_python(self.request.POST)
287 287 form_result['active'] = auto_active
288 288 external_identity = self.request.POST.get('external_identity')
289 289
290 290 if external_identity:
291 291 extern_name = external_identity
292 292 extern_type = external_identity
293 293
294 294 if captcha.active:
295 295 captcha_status, captcha_message = self.validate_captcha(
296 296 captcha.private_key)
297 297
298 298 if not captcha_status:
299 299 _value = form_result
300 300 _msg = _('Bad captcha')
301 301 error_dict = {'recaptcha_field': captcha_message}
302 302 raise formencode.Invalid(
303 303 _msg, _value, None, error_dict=error_dict)
304 304
305 305 new_user = UserModel().create_registration(
306 306 form_result, extern_name=extern_name, extern_type=extern_type)
307 307
308 308 action_data = {'data': new_user.get_api_data(),
309 309 'user_agent': self.request.user_agent}
310 310
311 311 if external_identity:
312 312 action_data['external_identity'] = external_identity
313 313
314 314 audit_user = audit_logger.UserWrap(
315 315 username=new_user.username,
316 316 user_id=new_user.user_id,
317 317 ip_addr=self.request.remote_addr)
318 318
319 319 audit_logger.store_web(
320 320 'user.register', action_data=action_data,
321 321 user=audit_user)
322 322
323 323 event = UserRegistered(user=new_user, session=self.session)
324 324 trigger(event)
325 325 h.flash(
326 326 _('You have successfully registered with RhodeCode. You can log-in now.'),
327 327 category='success')
328 328 if external_identity:
329 329 h.flash(
330 330 _('Please use the {identity} button to log-in').format(
331 331 identity=external_identity),
332 332 category='success')
333 333 Session().commit()
334 334
335 335 redirect_ro = self.request.route_path('login')
336 336 raise HTTPFound(redirect_ro)
337 337
338 338 except formencode.Invalid as errors:
339 339 errors.value.pop('password', None)
340 340 errors.value.pop('password_confirmation', None)
341 341 return self.register(
342 342 defaults=errors.value, errors=errors.error_dict)
343 343
344 344 except UserCreationError as e:
345 345 # container auth or other auth functions that create users on
346 346 # the fly can throw this exception signaling that there's issue
347 347 # with user creation, explanation should be provided in
348 348 # Exception itself
349 349 h.flash(e, category='error')
350 350 return self.register()
351 351
352 352 def password_reset(self):
353 353 c = self.load_default_context()
354 354 captcha = self._get_captcha_data()
355 355
356 356 template_context = {
357 357 'captcha_active': captcha.active,
358 358 'captcha_public_key': captcha.public_key,
359 359 'defaults': {},
360 360 'errors': {},
361 361 }
362 362
363 363 # always send implicit message to prevent from discovery of
364 364 # matching emails
365 365 msg = _('If such email exists, a password reset link was sent to it.')
366 366
367 367 def default_response():
368 368 log.debug('faking response on invalid password reset')
369 369 # make this take 2s, to prevent brute forcing.
370 370 time.sleep(2)
371 371 h.flash(msg, category='success')
372 372 return HTTPFound(self.request.route_path('reset_password'))
373 373
374 374 if self.request.POST:
375 375 if h.HasPermissionAny('hg.password_reset.disabled')():
376 376 _email = self.request.POST.get('email', '')
377 377 log.error('Failed attempt to reset password for `%s`.', _email)
378 378 h.flash(_('Password reset has been disabled.'), category='error')
379 379 return HTTPFound(self.request.route_path('reset_password'))
380 380
381 381 password_reset_form = PasswordResetForm(self.request.translate)()
382 382 description = 'Generated token for password reset from {}'.format(
383 383 datetime.datetime.now().isoformat())
384 384
385 385 try:
386 386 form_result = password_reset_form.to_python(
387 387 self.request.POST)
388 388 user_email = form_result['email']
389 389
390 390 if captcha.active:
391 391 captcha_status, captcha_message = self.validate_captcha(
392 392 captcha.private_key)
393 393
394 394 if not captcha_status:
395 395 _value = form_result
396 396 _msg = _('Bad captcha')
397 397 error_dict = {'recaptcha_field': captcha_message}
398 398 raise formencode.Invalid(
399 399 _msg, _value, None, error_dict=error_dict)
400 400
401 401 # Generate reset URL and send mail.
402 402 user = User.get_by_email(user_email)
403 403
404 404 # only allow rhodecode based users to reset their password
405 405 # external auth shouldn't allow password reset
406 406 if user and user.extern_type != auth_rhodecode.RhodeCodeAuthPlugin.uid:
407 407 log.warning('User %s with external type `%s` tried a password reset. '
408 408 'This try was rejected', user, user.extern_type)
409 409 return default_response()
410 410
411 411 # generate password reset token that expires in 10 minutes
412 412 reset_token = UserModel().add_auth_token(
413 413 user=user, lifetime_minutes=10,
414 414 role=UserModel.auth_token_role.ROLE_PASSWORD_RESET,
415 415 description=description)
416 416 Session().commit()
417 417
418 418 log.debug('Successfully created password recovery token')
419 419 password_reset_url = self.request.route_url(
420 420 'reset_password_confirmation',
421 421 _query={'key': reset_token.api_key})
422 422 UserModel().reset_password_link(
423 423 form_result, password_reset_url)
424 424
425 425 action_data = {'email': user_email,
426 426 'user_agent': self.request.user_agent}
427 427 audit_logger.store_web(
428 428 'user.password.reset_request', action_data=action_data,
429 429 user=self._rhodecode_user, commit=True)
430 430
431 431 return default_response()
432 432
433 433 except formencode.Invalid as errors:
434 434 template_context.update({
435 435 'defaults': errors.value,
436 436 'errors': errors.error_dict,
437 437 })
438 438 if not self.request.POST.get('email'):
439 439 # case of empty email, we want to report that
440 440 return self._get_template_context(c, **template_context)
441 441
442 442 if 'recaptcha_field' in errors.error_dict:
443 443 # case of failed captcha
444 444 return self._get_template_context(c, **template_context)
445 445
446 446 return default_response()
447 447
448 448 return self._get_template_context(c, **template_context)
449 449
450 450 def password_reset_confirmation(self):
451 451 self.load_default_context()
452 452
453 453 if key := self.request.GET.get('key'):
454 454 # make this take 2s, to prevent brute forcing.
455 455 time.sleep(2)
456 456
457 457 token = AuthTokenModel().get_auth_token(key)
458 458
459 459 # verify token is the correct role
460 460 if token is None or token.role != UserApiKeys.ROLE_PASSWORD_RESET:
461 461 log.debug('Got token with role:%s expected is %s',
462 462 getattr(token, 'role', 'EMPTY_TOKEN'),
463 463 UserApiKeys.ROLE_PASSWORD_RESET)
464 464 h.flash(
465 465 _('Given reset token is invalid'), category='error')
466 466 return HTTPFound(self.request.route_path('reset_password'))
467 467
468 468 try:
469 469 owner = token.user
470 470 data = {'email': owner.email, 'token': token.api_key}
471 471 UserModel().reset_password(data)
472 472 h.flash(
473 473 _('Your password reset was successful, '
474 474 'a new password has been sent to your email'),
475 475 category='success')
476 476 except Exception as e:
477 477 log.error(e)
478 478 return HTTPFound(self.request.route_path('reset_password'))
479 479
480 480 return HTTPFound(self.request.route_path('login'))
481 481
482 482 @LoginRequired()
483 483 @NotAnonymous()
484 484 def setup_2fa(self):
485 485 _ = self.request.translate
486 486 c = self.load_default_context()
487 487 user_instance = self._rhodecode_db_user
488 488 form = TOTPForm(_, user_instance)()
489 489 render_ctx = {}
490 490 if self.request.method == 'POST':
491 491 post_items = dict(self.request.POST)
492 492
493 493 try:
494 494 form_details = form.to_python(post_items)
495 495 secret = form_details['secret_totp']
496 496
497 497 user_instance.init_2fa_recovery_codes(persist=True, force=True)
498 user_instance.2fa_secret = secret
498 user_instance.secret_2fa = secret
499 499
500 500 Session().commit()
501 501 raise HTTPFound(self.request.route_path('my_account_configure_2fa', _query={'show-recovery-codes': 1}))
502 502 except formencode.Invalid as errors:
503 503 defaults = errors.value
504 504 render_ctx = {
505 505 'errors': errors.error_dict,
506 506 'defaults': defaults,
507 507 }
508 508
509 509 # NOTE: here we DO NOT persist the secret 2FA, since this is only for setup, once a setup is completed
510 510 # only then we should persist it
511 511 secret = user_instance.init_secret_2fa(persist=False)
512 512
513 513 instance_name = rhodecode.ConfigGet().get_str('app.base_url', 'rhodecode')
514 514 totp_name = f'{instance_name}:{self.request.user.username}'
515 515
516 516 qr = qrcode.QRCode(version=1, box_size=5, border=4)
517 517 qr.add_data(pyotp.totp.TOTP(secret).provisioning_uri(name=totp_name))
518 518 qr.make(fit=True)
519 519 img = qr.make_image(fill_color='black', back_color='white')
520 520 buffered = BytesIO()
521 521 img.save(buffered)
522 522 return self._get_template_context(
523 523 c,
524 524 qr=b64encode(buffered.getvalue()).decode("utf-8"),
525 525 key=secret,
526 526 totp_name=totp_name,
527 527 ** render_ctx
528 528 )
529 529
530 530 @LoginRequired()
531 531 @NotAnonymous()
532 532 def verify_2fa(self):
533 533 _ = self.request.translate
534 534 c = self.load_default_context()
535 535 render_ctx = {}
536 536 user_instance = self._rhodecode_db_user
537 537 totp_form = TOTPForm(_, user_instance, allow_recovery_code_use=True)()
538 538 if self.request.method == 'POST':
539 539 post_items = dict(self.request.POST)
540 540 # NOTE: inject secret, as it's a post configured saved item.
541 541 post_items['secret_totp'] = user_instance.secret_2fa
542 542 try:
543 543 totp_form.to_python(post_items)
544 544 user_instance.check_2fa_required = False
545 545 Session().commit()
546 546 raise HTTPFound(c.came_from)
547 547 except formencode.Invalid as errors:
548 548 defaults = errors.value
549 549 render_ctx = {
550 550 'errors': errors.error_dict,
551 551 'defaults': defaults,
552 552 }
553 553 return self._get_template_context(c, **render_ctx)
General Comments 0
You need to be logged in to leave comments. Login now