##// END OF EJS Templates
password-reset: strengthten security on password reset logic....
marcink -
r1471:9ea7077d default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,106 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import pytest
22
23 from rhodecode.config.routing import ADMIN_PREFIX
24 from rhodecode.tests import (
25 TestController, clear_all_caches, url,
26 TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
27 from rhodecode.tests.fixture import Fixture
28 from rhodecode.tests.utils import AssertResponse
29
30 fixture = Fixture()
31
32 # Hardcode URLs because we don't have a request object to use
33 # pyramids URL generation methods.
34 index_url = '/'
35 login_url = ADMIN_PREFIX + '/login'
36 logut_url = ADMIN_PREFIX + '/logout'
37 register_url = ADMIN_PREFIX + '/register'
38 pwd_reset_url = ADMIN_PREFIX + '/password_reset'
39 pwd_reset_confirm_url = ADMIN_PREFIX + '/password_reset_confirmation'
40
41
42 class TestPasswordReset(TestController):
43
44 @pytest.mark.parametrize(
45 'pwd_reset_setting, show_link, show_reset', [
46 ('hg.password_reset.enabled', True, True),
47 ('hg.password_reset.hidden', False, True),
48 ('hg.password_reset.disabled', False, False),
49 ])
50 def test_password_reset_settings(
51 self, pwd_reset_setting, show_link, show_reset):
52 clear_all_caches()
53 self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
54 params = {
55 'csrf_token': self.csrf_token,
56 'anonymous': 'True',
57 'default_register': 'hg.register.auto_activate',
58 'default_register_message': '',
59 'default_password_reset': pwd_reset_setting,
60 'default_extern_activate': 'hg.extern_activate.auto',
61 }
62 resp = self.app.post(url('admin_permissions_application'), params=params)
63 self.logout_user()
64
65 login_page = self.app.get(login_url)
66 asr_login = AssertResponse(login_page)
67 index_page = self.app.get(index_url)
68 asr_index = AssertResponse(index_page)
69
70 if show_link:
71 asr_login.one_element_exists('a.pwd_reset')
72 asr_index.one_element_exists('a.pwd_reset')
73 else:
74 asr_login.no_element_exists('a.pwd_reset')
75 asr_index.no_element_exists('a.pwd_reset')
76
77 response = self.app.get(pwd_reset_url)
78
79 assert_response = AssertResponse(response)
80 if show_reset:
81 response.mustcontain('Send password reset email')
82 assert_response.one_element_exists('#email')
83 assert_response.one_element_exists('#send')
84 else:
85 response.mustcontain('Password reset is disabled.')
86 assert_response.no_element_exists('#email')
87 assert_response.no_element_exists('#send')
88
89 def test_password_form_disabled(self):
90 self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
91 params = {
92 'csrf_token': self.csrf_token,
93 'anonymous': 'True',
94 'default_register': 'hg.register.auto_activate',
95 'default_register_message': '',
96 'default_password_reset': 'hg.password_reset.disabled',
97 'default_extern_activate': 'hg.extern_activate.auto',
98 }
99 self.app.post(url('admin_permissions_application'), params=params)
100 self.logout_user()
101
102 response = self.app.post(
103 pwd_reset_url, {'email': 'lisa@rhodecode.com',}
104 )
105 response = response.follow()
106 response.mustcontain('Password reset is disabled.')
@@ -1,358 +1,390 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import time
21 22 import collections
22 23 import datetime
23 24 import formencode
24 25 import logging
25 26 import urlparse
26 27
27 28 from pylons import url
28 29 from pyramid.httpexceptions import HTTPFound
29 30 from pyramid.view import view_config
30 31 from recaptcha.client.captcha import submit
31 32
32 33 from rhodecode.authentication.base import authenticate, HTTP_TYPE
33 34 from rhodecode.events import UserRegistered
34 35 from rhodecode.lib import helpers as h
35 36 from rhodecode.lib.auth import (
36 37 AuthUser, HasPermissionAnyDecorator, CSRFRequired)
37 38 from rhodecode.lib.base import get_ip_addr
38 39 from rhodecode.lib.exceptions import UserCreationError
39 40 from rhodecode.lib.utils2 import safe_str
40 from rhodecode.model.db import User
41 from rhodecode.model.db import User, UserApiKeys
41 42 from rhodecode.model.forms import LoginForm, RegisterForm, PasswordResetForm
42 43 from rhodecode.model.meta import Session
44 from rhodecode.model.auth_token import AuthTokenModel
43 45 from rhodecode.model.settings import SettingsModel
44 46 from rhodecode.model.user import UserModel
45 47 from rhodecode.translation import _
46 48
47 49
48 50 log = logging.getLogger(__name__)
49 51
50 52 CaptchaData = collections.namedtuple(
51 53 'CaptchaData', 'active, private_key, public_key')
52 54
53 55
54 56 def _store_user_in_session(session, username, remember=False):
55 57 user = User.get_by_username(username, case_insensitive=True)
56 58 auth_user = AuthUser(user.user_id)
57 59 auth_user.set_authenticated()
58 60 cs = auth_user.get_cookie_store()
59 61 session['rhodecode_user'] = cs
60 62 user.update_lastlogin()
61 63 Session().commit()
62 64
63 65 # If they want to be remembered, update the cookie
64 66 if remember:
65 67 _year = (datetime.datetime.now() +
66 68 datetime.timedelta(seconds=60 * 60 * 24 * 365))
67 69 session._set_cookie_expires(_year)
68 70
69 71 session.save()
70 72
71 73 safe_cs = cs.copy()
72 74 safe_cs['password'] = '****'
73 75 log.info('user %s is now authenticated and stored in '
74 76 'session, session attrs %s', username, safe_cs)
75 77
76 78 # dumps session attrs back to cookie
77 79 session._update_cookie_out()
78 80 # we set new cookie
79 81 headers = None
80 82 if session.request['set_cookie']:
81 83 # send set-cookie headers back to response to update cookie
82 84 headers = [('Set-Cookie', session.request['cookie_out'])]
83 85 return headers
84 86
85 87
86 88 def get_came_from(request):
87 89 came_from = safe_str(request.GET.get('came_from', ''))
88 90 parsed = urlparse.urlparse(came_from)
89 91 allowed_schemes = ['http', 'https']
90 92 if parsed.scheme and parsed.scheme not in allowed_schemes:
91 93 log.error('Suspicious URL scheme detected %s for url %s' %
92 94 (parsed.scheme, parsed))
93 95 came_from = url('home')
94 96 elif parsed.netloc and request.host != parsed.netloc:
95 97 log.error('Suspicious NETLOC detected %s for url %s server url '
96 98 'is: %s' % (parsed.netloc, parsed, request.host))
97 99 came_from = url('home')
98 100 elif any(bad_str in parsed.path for bad_str in ('\r', '\n')):
99 101 log.error('Header injection detected `%s` for url %s server url ' %
100 102 (parsed.path, parsed))
101 103 came_from = url('home')
102 104
103 105 return came_from or url('home')
104 106
105 107
106 108 class LoginView(object):
107 109
108 110 def __init__(self, context, request):
109 111 self.request = request
110 112 self.context = context
111 113 self.session = request.session
112 114 self._rhodecode_user = request.user
113 115
114 116 def _get_template_context(self):
115 117 return {
116 118 'came_from': get_came_from(self.request),
117 119 'defaults': {},
118 120 'errors': {},
119 121 }
120 122
121 123 def _get_captcha_data(self):
122 124 settings = SettingsModel().get_all_settings()
123 125 private_key = settings.get('rhodecode_captcha_private_key')
124 126 public_key = settings.get('rhodecode_captcha_public_key')
125 127 active = bool(private_key)
126 128 return CaptchaData(
127 129 active=active, private_key=private_key, public_key=public_key)
128 130
129 131 @view_config(
130 132 route_name='login', request_method='GET',
131 133 renderer='rhodecode:templates/login.mako')
132 134 def login(self):
133 135 came_from = get_came_from(self.request)
134 136 user = self.request.user
135 137
136 138 # redirect if already logged in
137 139 if user.is_authenticated and not user.is_default and user.ip_allowed:
138 140 raise HTTPFound(came_from)
139 141
140 142 # check if we use headers plugin, and try to login using it.
141 143 try:
142 144 log.debug('Running PRE-AUTH for headers based authentication')
143 145 auth_info = authenticate(
144 146 '', '', self.request.environ, HTTP_TYPE, skip_missing=True)
145 147 if auth_info:
146 148 headers = _store_user_in_session(
147 149 self.session, auth_info.get('username'))
148 150 raise HTTPFound(came_from, headers=headers)
149 151 except UserCreationError as e:
150 152 log.error(e)
151 153 self.session.flash(e, queue='error')
152 154
153 155 return self._get_template_context()
154 156
155 157 @view_config(
156 158 route_name='login', request_method='POST',
157 159 renderer='rhodecode:templates/login.mako')
158 160 def login_post(self):
159 161 came_from = get_came_from(self.request)
160 162
161 163 login_form = LoginForm()()
162 164
163 165 try:
164 166 self.session.invalidate()
165 167 form_result = login_form.to_python(self.request.params)
166 168 # form checks for username/password, now we're authenticated
167 169 headers = _store_user_in_session(
168 170 self.session,
169 171 username=form_result['username'],
170 172 remember=form_result['remember'])
171 173 log.debug('Redirecting to "%s" after login.', came_from)
172 174 raise HTTPFound(came_from, headers=headers)
173 175 except formencode.Invalid as errors:
174 176 defaults = errors.value
175 177 # remove password from filling in form again
176 178 defaults.pop('password', None)
177 179 render_ctx = self._get_template_context()
178 180 render_ctx.update({
179 181 'errors': errors.error_dict,
180 182 'defaults': defaults,
181 183 })
182 184 return render_ctx
183 185
184 186 except UserCreationError as e:
185 187 # headers auth or other auth functions that create users on
186 188 # the fly can throw this exception signaling that there's issue
187 189 # with user creation, explanation should be provided in
188 190 # Exception itself
189 191 self.session.flash(e, queue='error')
190 192 return self._get_template_context()
191 193
192 194 @CSRFRequired()
193 195 @view_config(route_name='logout', request_method='POST')
194 196 def logout(self):
195 197 user = self.request.user
196 198 log.info('Deleting session for user: `%s`', user)
197 199 self.session.delete()
198 200 return HTTPFound(url('home'))
199 201
200 202 @HasPermissionAnyDecorator(
201 203 'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate')
202 204 @view_config(
203 205 route_name='register', request_method='GET',
204 206 renderer='rhodecode:templates/register.mako',)
205 207 def register(self, defaults=None, errors=None):
206 208 defaults = defaults or {}
207 209 errors = errors or {}
208 210
209 211 settings = SettingsModel().get_all_settings()
210 212 register_message = settings.get('rhodecode_register_message') or ''
211 213 captcha = self._get_captcha_data()
212 214 auto_active = 'hg.register.auto_activate' in User.get_default_user()\
213 215 .AuthUser.permissions['global']
214 216
215 217 render_ctx = self._get_template_context()
216 218 render_ctx.update({
217 219 'defaults': defaults,
218 220 'errors': errors,
219 221 'auto_active': auto_active,
220 222 'captcha_active': captcha.active,
221 223 'captcha_public_key': captcha.public_key,
222 224 'register_message': register_message,
223 225 })
224 226 return render_ctx
225 227
226 228 @HasPermissionAnyDecorator(
227 229 'hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate')
228 230 @view_config(
229 231 route_name='register', request_method='POST',
230 232 renderer='rhodecode:templates/register.mako')
231 233 def register_post(self):
232 234 captcha = self._get_captcha_data()
233 235 auto_active = 'hg.register.auto_activate' in User.get_default_user()\
234 236 .AuthUser.permissions['global']
235 237
236 238 register_form = RegisterForm()()
237 239 try:
238 240 form_result = register_form.to_python(self.request.params)
239 241 form_result['active'] = auto_active
240 242
241 243 if captcha.active:
242 244 response = submit(
243 245 self.request.params.get('recaptcha_challenge_field'),
244 246 self.request.params.get('recaptcha_response_field'),
245 247 private_key=captcha.private_key,
246 248 remoteip=get_ip_addr(self.request.environ))
247 249 if not response.is_valid:
248 250 _value = form_result
249 251 _msg = _('Bad captcha')
250 252 error_dict = {'recaptcha_field': _msg}
251 253 raise formencode.Invalid(_msg, _value, None,
252 254 error_dict=error_dict)
253 255
254 256 new_user = UserModel().create_registration(form_result)
255 257 event = UserRegistered(user=new_user, session=self.session)
256 258 self.request.registry.notify(event)
257 259 self.session.flash(
258 260 _('You have successfully registered with RhodeCode'),
259 261 queue='success')
260 262 Session().commit()
261 263
262 264 redirect_ro = self.request.route_path('login')
263 265 raise HTTPFound(redirect_ro)
264 266
265 267 except formencode.Invalid as errors:
266 268 errors.value.pop('password', None)
267 269 errors.value.pop('password_confirmation', None)
268 270 return self.register(
269 271 defaults=errors.value, errors=errors.error_dict)
270 272
271 273 except UserCreationError as e:
272 274 # container auth or other auth functions that create users on
273 275 # the fly can throw this exception signaling that there's issue
274 276 # with user creation, explanation should be provided in
275 277 # Exception itself
276 278 self.session.flash(e, queue='error')
277 279 return self.register()
278 280
279 281 @view_config(
280 282 route_name='reset_password', request_method=('GET', 'POST'),
281 283 renderer='rhodecode:templates/password_reset.mako')
282 284 def password_reset(self):
283 285 captcha = self._get_captcha_data()
284 286
285 287 render_ctx = {
286 288 'captcha_active': captcha.active,
287 289 'captcha_public_key': captcha.public_key,
288 290 'defaults': {},
289 291 'errors': {},
290 292 }
291 293
294 # always send implicit message to prevent from discovery of
295 # matching emails
296 msg = _('If such email exists, a password reset link was sent to it.')
297
292 298 if self.request.POST:
299 if h.HasPermissionAny('hg.password_reset.disabled')():
300 _email = self.request.POST.get('email', '')
301 log.error('Failed attempt to reset password for `%s`.', _email)
302 self.session.flash(_('Password reset has been disabled.'),
303 queue='error')
304 return HTTPFound(self.request.route_path('reset_password'))
305
293 306 password_reset_form = PasswordResetForm()()
294 307 try:
295 308 form_result = password_reset_form.to_python(
296 309 self.request.params)
297 if h.HasPermissionAny('hg.password_reset.disabled')():
298 log.error('Failed attempt to reset password for %s.', form_result['email'] )
299 self.session.flash(
300 _('Password reset has been disabled.'),
301 queue='error')
302 return HTTPFound(self.request.route_path('reset_password'))
310 user_email = form_result['email']
311
303 312 if captcha.active:
304 313 response = submit(
305 314 self.request.params.get('recaptcha_challenge_field'),
306 315 self.request.params.get('recaptcha_response_field'),
307 316 private_key=captcha.private_key,
308 317 remoteip=get_ip_addr(self.request.environ))
309 318 if not response.is_valid:
310 319 _value = form_result
311 320 _msg = _('Bad captcha')
312 321 error_dict = {'recaptcha_field': _msg}
313 raise formencode.Invalid(_msg, _value, None,
314 error_dict=error_dict)
322 raise formencode.Invalid(
323 _msg, _value, None, error_dict=error_dict)
324 # Generate reset URL and send mail.
325 user = User.get_by_email(user_email)
315 326
316 # Generate reset URL and send mail.
317 user_email = form_result['email']
318 user = User.get_by_email(user_email)
327 # generate password reset token that expires in 10minutes
328 desc = 'Generated token for password reset from {}'.format(
329 datetime.datetime.now().isoformat())
330 reset_token = AuthTokenModel().create(
331 user, lifetime=10,
332 description=desc,
333 role=UserApiKeys.ROLE_PASSWORD_RESET)
334 Session().commit()
335
336 log.debug('Successfully created password recovery token')
319 337 password_reset_url = self.request.route_url(
320 338 'reset_password_confirmation',
321 _query={'key': user.api_key})
339 _query={'key': reset_token.api_key})
322 340 UserModel().reset_password_link(
323 341 form_result, password_reset_url)
324
325 342 # Display success message and redirect.
326 self.session.flash(
327 _('Your password reset link was sent'),
328 queue='success')
329 return HTTPFound(self.request.route_path('login'))
343 self.session.flash(msg, queue='success')
344 return HTTPFound(self.request.route_path('reset_password'))
330 345
331 346 except formencode.Invalid as errors:
332 347 render_ctx.update({
333 348 'defaults': errors.value,
334 'errors': errors.error_dict,
335 349 })
350 log.debug('faking response on invalid password reset')
351 # make this take 2s, to prevent brute forcing.
352 time.sleep(2)
353 self.session.flash(msg, queue='success')
354 return HTTPFound(self.request.route_path('reset_password'))
336 355
337 356 return render_ctx
338 357
339 358 @view_config(route_name='reset_password_confirmation',
340 359 request_method='GET')
341 360 def password_reset_confirmation(self):
361
342 362 if self.request.GET and self.request.GET.get('key'):
363 # make this take 2s, to prevent brute forcing.
364 time.sleep(2)
365
366 token = AuthTokenModel().get_auth_token(
367 self.request.GET.get('key'))
368
369 # verify token is the correct role
370 if token is None or token.role != UserApiKeys.ROLE_PASSWORD_RESET:
371 log.debug('Got token with role:%s expected is %s',
372 getattr(token, 'role', 'EMPTY_TOKEN'),
373 UserApiKeys.ROLE_PASSWORD_RESET)
374 self.session.flash(
375 _('Given reset token is invalid'), queue='error')
376 return HTTPFound(self.request.route_path('reset_password'))
377
343 378 try:
344 user = User.get_by_auth_token(self.request.GET.get('key'))
345 password_reset_url = self.request.route_url(
346 'reset_password_confirmation',
347 _query={'key': user.api_key})
348 data = {'email': user.email}
349 UserModel().reset_password(data, password_reset_url)
379 owner = token.user
380 data = {'email': owner.email, 'token': token.api_key}
381 UserModel().reset_password(data)
350 382 self.session.flash(
351 383 _('Your password reset was successful, '
352 384 'a new password has been sent to your email'),
353 385 queue='success')
354 386 except Exception as e:
355 387 log.error(e)
356 388 return HTTPFound(self.request.route_path('reset_password'))
357 389
358 390 return HTTPFound(self.request.route_path('login'))
@@ -1,87 +1,97 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2013-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 authentication tokens model for RhodeCode
23 23 """
24 24
25 25 import time
26 26 import logging
27 27 import traceback
28 28 from sqlalchemy import or_
29 29
30 30 from rhodecode.model import BaseModel
31 31 from rhodecode.model.db import UserApiKeys
32 32 from rhodecode.model.meta import Session
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 class AuthTokenModel(BaseModel):
38 38 cls = UserApiKeys
39 39
40 40 def create(self, user, description, lifetime=-1, role=UserApiKeys.ROLE_ALL):
41 41 """
42 42 :param user: user or user_id
43 43 :param description: description of ApiKey
44 :param lifetime: expiration time in seconds
44 :param lifetime: expiration time in minutes
45 45 :param role: role for the apikey
46 46 """
47 47 from rhodecode.lib.auth import generate_auth_token
48 48
49 49 user = self._get_user(user)
50 50
51 51 new_auth_token = UserApiKeys()
52 52 new_auth_token.api_key = generate_auth_token(user.username)
53 53 new_auth_token.user_id = user.user_id
54 54 new_auth_token.description = description
55 55 new_auth_token.role = role
56 56 new_auth_token.expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
57 57 Session().add(new_auth_token)
58 58
59 59 return new_auth_token
60 60
61 61 def delete(self, api_key, user=None):
62 62 """
63 63 Deletes given api_key, if user is set it also filters the object for
64 64 deletion by given user.
65 65 """
66 66 api_key = UserApiKeys.query().filter(UserApiKeys.api_key == api_key)
67 67
68 68 if user:
69 69 user = self._get_user(user)
70 70 api_key = api_key.filter(UserApiKeys.user_id == user.user_id)
71 71
72 72 api_key = api_key.scalar()
73 73 try:
74 74 Session().delete(api_key)
75 75 except Exception:
76 76 log.error(traceback.format_exc())
77 77 raise
78 78
79 79 def get_auth_tokens(self, user, show_expired=True):
80 80 user = self._get_user(user)
81 81 user_auth_tokens = UserApiKeys.query()\
82 82 .filter(UserApiKeys.user_id == user.user_id)
83 83 if not show_expired:
84 84 user_auth_tokens = user_auth_tokens\
85 85 .filter(or_(UserApiKeys.expires == -1,
86 86 UserApiKeys.expires >= time.time()))
87 87 return user_auth_tokens
88
89 def get_auth_token(self, auth_token):
90 auth_token = UserApiKeys.query().filter(
91 UserApiKeys.api_key == auth_token)
92 auth_token = auth_token \
93 .filter(or_(UserApiKeys.expires == -1,
94 UserApiKeys.expires >= time.time()))\
95 .first()
96
97 return auth_token
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,845 +1,852 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 users model for RhodeCode
23 23 """
24 24
25 25 import logging
26 26 import traceback
27 27
28 28 import datetime
29 29 from pylons.i18n.translation import _
30 30
31 31 import ipaddress
32 32 from sqlalchemy.exc import DatabaseError
33 33 from sqlalchemy.sql.expression import true, false
34 34
35 35 from rhodecode import events
36 36 from rhodecode.lib.utils2 import (
37 37 safe_unicode, get_current_rhodecode_user, action_logger_generic,
38 38 AttributeDict, str2bool)
39 39 from rhodecode.lib.caching_query import FromCache
40 40 from rhodecode.model import BaseModel
41 41 from rhodecode.model.auth_token import AuthTokenModel
42 42 from rhodecode.model.db import (
43 43 User, UserToPerm, UserEmailMap, UserIpMap)
44 44 from rhodecode.lib.exceptions import (
45 45 DefaultUserException, UserOwnsReposException, UserOwnsRepoGroupsException,
46 46 UserOwnsUserGroupsException, NotAllowedToCreateUserError)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.repo_group import RepoGroupModel
49 49
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53
54 54 class UserModel(BaseModel):
55 55 cls = User
56 56
57 57 def get(self, user_id, cache=False):
58 58 user = self.sa.query(User)
59 59 if cache:
60 60 user = user.options(FromCache("sql_cache_short",
61 61 "get_user_%s" % user_id))
62 62 return user.get(user_id)
63 63
64 64 def get_user(self, user):
65 65 return self._get_user(user)
66 66
67 67 def get_by_username(self, username, cache=False, case_insensitive=False):
68 68
69 69 if case_insensitive:
70 70 user = self.sa.query(User).filter(User.username.ilike(username))
71 71 else:
72 72 user = self.sa.query(User)\
73 73 .filter(User.username == username)
74 74 if cache:
75 75 user = user.options(FromCache("sql_cache_short",
76 76 "get_user_%s" % username))
77 77 return user.scalar()
78 78
79 79 def get_by_email(self, email, cache=False, case_insensitive=False):
80 80 return User.get_by_email(email, case_insensitive, cache)
81 81
82 82 def get_by_auth_token(self, auth_token, cache=False):
83 83 return User.get_by_auth_token(auth_token, cache)
84 84
85 85 def get_active_user_count(self, cache=False):
86 86 return User.query().filter(
87 87 User.active == True).filter(
88 88 User.username != User.DEFAULT_USER).count()
89 89
90 90 def create(self, form_data, cur_user=None):
91 91 if not cur_user:
92 92 cur_user = getattr(get_current_rhodecode_user(), 'username', None)
93 93
94 94 user_data = {
95 95 'username': form_data['username'],
96 96 'password': form_data['password'],
97 97 'email': form_data['email'],
98 98 'firstname': form_data['firstname'],
99 99 'lastname': form_data['lastname'],
100 100 'active': form_data['active'],
101 101 'extern_type': form_data['extern_type'],
102 102 'extern_name': form_data['extern_name'],
103 103 'admin': False,
104 104 'cur_user': cur_user
105 105 }
106 106
107 107 if 'create_repo_group' in form_data:
108 108 user_data['create_repo_group'] = str2bool(
109 109 form_data.get('create_repo_group'))
110 110
111 111 try:
112 112 if form_data.get('password_change'):
113 113 user_data['force_password_change'] = True
114 114 return UserModel().create_or_update(**user_data)
115 115 except Exception:
116 116 log.error(traceback.format_exc())
117 117 raise
118 118
119 119 def update_user(self, user, skip_attrs=None, **kwargs):
120 120 from rhodecode.lib.auth import get_crypt_password
121 121
122 122 user = self._get_user(user)
123 123 if user.username == User.DEFAULT_USER:
124 124 raise DefaultUserException(
125 125 _("You can't Edit this user since it's"
126 126 " crucial for entire application"))
127 127
128 128 # first store only defaults
129 129 user_attrs = {
130 130 'updating_user_id': user.user_id,
131 131 'username': user.username,
132 132 'password': user.password,
133 133 'email': user.email,
134 134 'firstname': user.name,
135 135 'lastname': user.lastname,
136 136 'active': user.active,
137 137 'admin': user.admin,
138 138 'extern_name': user.extern_name,
139 139 'extern_type': user.extern_type,
140 140 'language': user.user_data.get('language')
141 141 }
142 142
143 143 # in case there's new_password, that comes from form, use it to
144 144 # store password
145 145 if kwargs.get('new_password'):
146 146 kwargs['password'] = kwargs['new_password']
147 147
148 148 # cleanups, my_account password change form
149 149 kwargs.pop('current_password', None)
150 150 kwargs.pop('new_password', None)
151 151
152 152 # cleanups, user edit password change form
153 153 kwargs.pop('password_confirmation', None)
154 154 kwargs.pop('password_change', None)
155 155
156 156 # create repo group on user creation
157 157 kwargs.pop('create_repo_group', None)
158 158
159 159 # legacy forms send name, which is the firstname
160 160 firstname = kwargs.pop('name', None)
161 161 if firstname:
162 162 kwargs['firstname'] = firstname
163 163
164 164 for k, v in kwargs.items():
165 165 # skip if we don't want to update this
166 166 if skip_attrs and k in skip_attrs:
167 167 continue
168 168
169 169 user_attrs[k] = v
170 170
171 171 try:
172 172 return self.create_or_update(**user_attrs)
173 173 except Exception:
174 174 log.error(traceback.format_exc())
175 175 raise
176 176
177 177 def create_or_update(
178 178 self, username, password, email, firstname='', lastname='',
179 179 active=True, admin=False, extern_type=None, extern_name=None,
180 180 cur_user=None, plugin=None, force_password_change=False,
181 181 allow_to_create_user=True, create_repo_group=None,
182 182 updating_user_id=None, language=None, strict_creation_check=True):
183 183 """
184 184 Creates a new instance if not found, or updates current one
185 185
186 186 :param username:
187 187 :param password:
188 188 :param email:
189 189 :param firstname:
190 190 :param lastname:
191 191 :param active:
192 192 :param admin:
193 193 :param extern_type:
194 194 :param extern_name:
195 195 :param cur_user:
196 196 :param plugin: optional plugin this method was called from
197 197 :param force_password_change: toggles new or existing user flag
198 198 for password change
199 199 :param allow_to_create_user: Defines if the method can actually create
200 200 new users
201 201 :param create_repo_group: Defines if the method should also
202 202 create an repo group with user name, and owner
203 203 :param updating_user_id: if we set it up this is the user we want to
204 204 update this allows to editing username.
205 205 :param language: language of user from interface.
206 206
207 207 :returns: new User object with injected `is_new_user` attribute.
208 208 """
209 209 if not cur_user:
210 210 cur_user = getattr(get_current_rhodecode_user(), 'username', None)
211 211
212 212 from rhodecode.lib.auth import (
213 213 get_crypt_password, check_password, generate_auth_token)
214 214 from rhodecode.lib.hooks_base import (
215 215 log_create_user, check_allowed_create_user)
216 216
217 217 def _password_change(new_user, password):
218 218 # empty password
219 219 if not new_user.password:
220 220 return False
221 221
222 222 # password check is only needed for RhodeCode internal auth calls
223 223 # in case it's a plugin we don't care
224 224 if not plugin:
225 225
226 226 # first check if we gave crypted password back, and if it
227 227 # matches it's not password change
228 228 if new_user.password == password:
229 229 return False
230 230
231 231 password_match = check_password(password, new_user.password)
232 232 if not password_match:
233 233 return True
234 234
235 235 return False
236 236
237 237 # read settings on default personal repo group creation
238 238 if create_repo_group is None:
239 239 default_create_repo_group = RepoGroupModel()\
240 240 .get_default_create_personal_repo_group()
241 241 create_repo_group = default_create_repo_group
242 242
243 243 user_data = {
244 244 'username': username,
245 245 'password': password,
246 246 'email': email,
247 247 'firstname': firstname,
248 248 'lastname': lastname,
249 249 'active': active,
250 250 'admin': admin
251 251 }
252 252
253 253 if updating_user_id:
254 254 log.debug('Checking for existing account in RhodeCode '
255 255 'database with user_id `%s` ' % (updating_user_id,))
256 256 user = User.get(updating_user_id)
257 257 else:
258 258 log.debug('Checking for existing account in RhodeCode '
259 259 'database with username `%s` ' % (username,))
260 260 user = User.get_by_username(username, case_insensitive=True)
261 261
262 262 if user is None:
263 263 # we check internal flag if this method is actually allowed to
264 264 # create new user
265 265 if not allow_to_create_user:
266 266 msg = ('Method wants to create new user, but it is not '
267 267 'allowed to do so')
268 268 log.warning(msg)
269 269 raise NotAllowedToCreateUserError(msg)
270 270
271 271 log.debug('Creating new user %s', username)
272 272
273 273 # only if we create user that is active
274 274 new_active_user = active
275 275 if new_active_user and strict_creation_check:
276 276 # raises UserCreationError if it's not allowed for any reason to
277 277 # create new active user, this also executes pre-create hooks
278 278 check_allowed_create_user(user_data, cur_user, strict_check=True)
279 279 events.trigger(events.UserPreCreate(user_data))
280 280 new_user = User()
281 281 edit = False
282 282 else:
283 283 log.debug('updating user %s', username)
284 284 events.trigger(events.UserPreUpdate(user, user_data))
285 285 new_user = user
286 286 edit = True
287 287
288 288 # we're not allowed to edit default user
289 289 if user.username == User.DEFAULT_USER:
290 290 raise DefaultUserException(
291 291 _("You can't edit this user (`%(username)s`) since it's "
292 292 "crucial for entire application") % {'username': user.username})
293 293
294 294 # inject special attribute that will tell us if User is new or old
295 295 new_user.is_new_user = not edit
296 296 # for users that didn's specify auth type, we use RhodeCode built in
297 297 from rhodecode.authentication.plugins import auth_rhodecode
298 298 extern_name = extern_name or auth_rhodecode.RhodeCodeAuthPlugin.name
299 299 extern_type = extern_type or auth_rhodecode.RhodeCodeAuthPlugin.name
300 300
301 301 try:
302 302 new_user.username = username
303 303 new_user.admin = admin
304 304 new_user.email = email
305 305 new_user.active = active
306 306 new_user.extern_name = safe_unicode(extern_name)
307 307 new_user.extern_type = safe_unicode(extern_type)
308 308 new_user.name = firstname
309 309 new_user.lastname = lastname
310 310
311 311 if not edit:
312 312 new_user.api_key = generate_auth_token(username)
313 313
314 314 # set password only if creating an user or password is changed
315 315 if not edit or _password_change(new_user, password):
316 316 reason = 'new password' if edit else 'new user'
317 317 log.debug('Updating password reason=>%s', reason)
318 318 new_user.password = get_crypt_password(password) if password else None
319 319
320 320 if force_password_change:
321 321 new_user.update_userdata(force_password_change=True)
322 322 if language:
323 323 new_user.update_userdata(language=language)
324 324 new_user.update_userdata(notification_status=True)
325 325
326 326 self.sa.add(new_user)
327 327
328 328 if not edit and create_repo_group:
329 329 RepoGroupModel().create_personal_repo_group(
330 330 new_user, commit_early=False)
331 331
332 332 if not edit:
333 333 # add the RSS token
334 334 AuthTokenModel().create(username,
335 335 description='Generated feed token',
336 336 role=AuthTokenModel.cls.ROLE_FEED)
337 337 log_create_user(created_by=cur_user, **new_user.get_dict())
338 338 events.trigger(events.UserPostCreate(user_data))
339 339 return new_user
340 340 except (DatabaseError,):
341 341 log.error(traceback.format_exc())
342 342 raise
343 343
344 344 def create_registration(self, form_data):
345 345 from rhodecode.model.notification import NotificationModel
346 346 from rhodecode.model.notification import EmailNotificationModel
347 347
348 348 try:
349 349 form_data['admin'] = False
350 350 form_data['extern_name'] = 'rhodecode'
351 351 form_data['extern_type'] = 'rhodecode'
352 352 new_user = self.create(form_data)
353 353
354 354 self.sa.add(new_user)
355 355 self.sa.flush()
356 356
357 357 user_data = new_user.get_dict()
358 358 kwargs = {
359 359 # use SQLALCHEMY safe dump of user data
360 360 'user': AttributeDict(user_data),
361 361 'date': datetime.datetime.now()
362 362 }
363 363 notification_type = EmailNotificationModel.TYPE_REGISTRATION
364 364 # pre-generate the subject for notification itself
365 365 (subject,
366 366 _h, _e, # we don't care about those
367 367 body_plaintext) = EmailNotificationModel().render_email(
368 368 notification_type, **kwargs)
369 369
370 370 # create notification objects, and emails
371 371 NotificationModel().create(
372 372 created_by=new_user,
373 373 notification_subject=subject,
374 374 notification_body=body_plaintext,
375 375 notification_type=notification_type,
376 376 recipients=None, # all admins
377 377 email_kwargs=kwargs,
378 378 )
379 379
380 380 return new_user
381 381 except Exception:
382 382 log.error(traceback.format_exc())
383 383 raise
384 384
385 385 def _handle_user_repos(self, username, repositories, handle_mode=None):
386 386 _superadmin = self.cls.get_first_super_admin()
387 387 left_overs = True
388 388
389 389 from rhodecode.model.repo import RepoModel
390 390
391 391 if handle_mode == 'detach':
392 392 for obj in repositories:
393 393 obj.user = _superadmin
394 394 # set description we know why we super admin now owns
395 395 # additional repositories that were orphaned !
396 396 obj.description += ' \n::detached repository from deleted user: %s' % (username,)
397 397 self.sa.add(obj)
398 398 left_overs = False
399 399 elif handle_mode == 'delete':
400 400 for obj in repositories:
401 401 RepoModel().delete(obj, forks='detach')
402 402 left_overs = False
403 403
404 404 # if nothing is done we have left overs left
405 405 return left_overs
406 406
407 407 def _handle_user_repo_groups(self, username, repository_groups,
408 408 handle_mode=None):
409 409 _superadmin = self.cls.get_first_super_admin()
410 410 left_overs = True
411 411
412 412 from rhodecode.model.repo_group import RepoGroupModel
413 413
414 414 if handle_mode == 'detach':
415 415 for r in repository_groups:
416 416 r.user = _superadmin
417 417 # set description we know why we super admin now owns
418 418 # additional repositories that were orphaned !
419 419 r.group_description += ' \n::detached repository group from deleted user: %s' % (username,)
420 420 self.sa.add(r)
421 421 left_overs = False
422 422 elif handle_mode == 'delete':
423 423 for r in repository_groups:
424 424 RepoGroupModel().delete(r)
425 425 left_overs = False
426 426
427 427 # if nothing is done we have left overs left
428 428 return left_overs
429 429
430 430 def _handle_user_user_groups(self, username, user_groups, handle_mode=None):
431 431 _superadmin = self.cls.get_first_super_admin()
432 432 left_overs = True
433 433
434 434 from rhodecode.model.user_group import UserGroupModel
435 435
436 436 if handle_mode == 'detach':
437 437 for r in user_groups:
438 438 for user_user_group_to_perm in r.user_user_group_to_perm:
439 439 if user_user_group_to_perm.user.username == username:
440 440 user_user_group_to_perm.user = _superadmin
441 441 r.user = _superadmin
442 442 # set description we know why we super admin now owns
443 443 # additional repositories that were orphaned !
444 444 r.user_group_description += ' \n::detached user group from deleted user: %s' % (username,)
445 445 self.sa.add(r)
446 446 left_overs = False
447 447 elif handle_mode == 'delete':
448 448 for r in user_groups:
449 449 UserGroupModel().delete(r)
450 450 left_overs = False
451 451
452 452 # if nothing is done we have left overs left
453 453 return left_overs
454 454
455 455 def delete(self, user, cur_user=None, handle_repos=None,
456 456 handle_repo_groups=None, handle_user_groups=None):
457 457 if not cur_user:
458 458 cur_user = getattr(get_current_rhodecode_user(), 'username', None)
459 459 user = self._get_user(user)
460 460
461 461 try:
462 462 if user.username == User.DEFAULT_USER:
463 463 raise DefaultUserException(
464 464 _(u"You can't remove this user since it's"
465 465 u" crucial for entire application"))
466 466
467 467 left_overs = self._handle_user_repos(
468 468 user.username, user.repositories, handle_repos)
469 469 if left_overs and user.repositories:
470 470 repos = [x.repo_name for x in user.repositories]
471 471 raise UserOwnsReposException(
472 472 _(u'user "%s" still owns %s repositories and cannot be '
473 473 u'removed. Switch owners or remove those repositories:%s')
474 474 % (user.username, len(repos), ', '.join(repos)))
475 475
476 476 left_overs = self._handle_user_repo_groups(
477 477 user.username, user.repository_groups, handle_repo_groups)
478 478 if left_overs and user.repository_groups:
479 479 repo_groups = [x.group_name for x in user.repository_groups]
480 480 raise UserOwnsRepoGroupsException(
481 481 _(u'user "%s" still owns %s repository groups and cannot be '
482 482 u'removed. Switch owners or remove those repository groups:%s')
483 483 % (user.username, len(repo_groups), ', '.join(repo_groups)))
484 484
485 485 left_overs = self._handle_user_user_groups(
486 486 user.username, user.user_groups, handle_user_groups)
487 487 if left_overs and user.user_groups:
488 488 user_groups = [x.users_group_name for x in user.user_groups]
489 489 raise UserOwnsUserGroupsException(
490 490 _(u'user "%s" still owns %s user groups and cannot be '
491 491 u'removed. Switch owners or remove those user groups:%s')
492 492 % (user.username, len(user_groups), ', '.join(user_groups)))
493 493
494 494 # we might change the user data with detach/delete, make sure
495 495 # the object is marked as expired before actually deleting !
496 496 self.sa.expire(user)
497 497 self.sa.delete(user)
498 498 from rhodecode.lib.hooks_base import log_delete_user
499 499 log_delete_user(deleted_by=cur_user, **user.get_dict())
500 500 except Exception:
501 501 log.error(traceback.format_exc())
502 502 raise
503 503
504 504 def reset_password_link(self, data, pwd_reset_url):
505 505 from rhodecode.lib.celerylib import tasks, run_task
506 506 from rhodecode.model.notification import EmailNotificationModel
507 507 user_email = data['email']
508 508 try:
509 509 user = User.get_by_email(user_email)
510 510 if user:
511 511 log.debug('password reset user found %s', user)
512 512
513 513 email_kwargs = {
514 514 'password_reset_url': pwd_reset_url,
515 515 'user': user,
516 516 'email': user_email,
517 517 'date': datetime.datetime.now()
518 518 }
519 519
520 520 (subject, headers, email_body,
521 521 email_body_plaintext) = EmailNotificationModel().render_email(
522 522 EmailNotificationModel.TYPE_PASSWORD_RESET, **email_kwargs)
523 523
524 524 recipients = [user_email]
525 525
526 526 action_logger_generic(
527 527 'sending password reset email to user: {}'.format(
528 528 user), namespace='security.password_reset')
529 529
530 530 run_task(tasks.send_email, recipients, subject,
531 531 email_body_plaintext, email_body)
532 532
533 533 else:
534 534 log.debug("password reset email %s not found", user_email)
535 535 except Exception:
536 536 log.error(traceback.format_exc())
537 537 return False
538 538
539 539 return True
540 540
541 def reset_password(self, data, pwd_reset_url):
541 def reset_password(self, data):
542 542 from rhodecode.lib.celerylib import tasks, run_task
543 543 from rhodecode.model.notification import EmailNotificationModel
544 544 from rhodecode.lib import auth
545 545 user_email = data['email']
546 546 pre_db = True
547 547 try:
548 548 user = User.get_by_email(user_email)
549 549 new_passwd = auth.PasswordGenerator().gen_password(
550 550 12, auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
551 551 if user:
552 552 user.password = auth.get_crypt_password(new_passwd)
553 553 # also force this user to reset his password !
554 554 user.update_userdata(force_password_change=True)
555 555
556 556 Session().add(user)
557
558 # now delete the token in question
559 UserApiKeys = AuthTokenModel.cls
560 UserApiKeys().query().filter(
561 UserApiKeys.api_key == data['token']).delete()
562
557 563 Session().commit()
558 564 log.info('successfully reset password for `%s`', user_email)
565
559 566 if new_passwd is None:
560 567 raise Exception('unable to generate new password')
561 568
562 569 pre_db = False
563 570
564 571 email_kwargs = {
565 572 'new_password': new_passwd,
566 'password_reset_url': pwd_reset_url,
567 573 'user': user,
568 574 'email': user_email,
569 575 'date': datetime.datetime.now()
570 576 }
571 577
572 578 (subject, headers, email_body,
573 579 email_body_plaintext) = EmailNotificationModel().render_email(
574 EmailNotificationModel.TYPE_PASSWORD_RESET_CONFIRMATION, **email_kwargs)
580 EmailNotificationModel.TYPE_PASSWORD_RESET_CONFIRMATION,
581 **email_kwargs)
575 582
576 583 recipients = [user_email]
577 584
578 585 action_logger_generic(
579 586 'sent new password to user: {} with email: {}'.format(
580 587 user, user_email), namespace='security.password_reset')
581 588
582 589 run_task(tasks.send_email, recipients, subject,
583 590 email_body_plaintext, email_body)
584 591
585 592 except Exception:
586 593 log.error('Failed to update user password')
587 594 log.error(traceback.format_exc())
588 595 if pre_db:
589 596 # we rollback only if local db stuff fails. If it goes into
590 597 # run_task, we're pass rollback state this wouldn't work then
591 598 Session().rollback()
592 599
593 600 return True
594 601
595 602 def fill_data(self, auth_user, user_id=None, api_key=None, username=None):
596 603 """
597 604 Fetches auth_user by user_id,or api_key if present.
598 605 Fills auth_user attributes with those taken from database.
599 606 Additionally set's is_authenitated if lookup fails
600 607 present in database
601 608
602 609 :param auth_user: instance of user to set attributes
603 610 :param user_id: user id to fetch by
604 611 :param api_key: api key to fetch by
605 612 :param username: username to fetch by
606 613 """
607 614 if user_id is None and api_key is None and username is None:
608 615 raise Exception('You need to pass user_id, api_key or username')
609 616
610 617 log.debug(
611 618 'doing fill data based on: user_id:%s api_key:%s username:%s',
612 619 user_id, api_key, username)
613 620 try:
614 621 dbuser = None
615 622 if user_id:
616 623 dbuser = self.get(user_id)
617 624 elif api_key:
618 625 dbuser = self.get_by_auth_token(api_key)
619 626 elif username:
620 627 dbuser = self.get_by_username(username)
621 628
622 629 if not dbuser:
623 630 log.warning(
624 631 'Unable to lookup user by id:%s api_key:%s username:%s',
625 632 user_id, api_key, username)
626 633 return False
627 634 if not dbuser.active:
628 635 log.debug('User `%s` is inactive, skipping fill data', username)
629 636 return False
630 637
631 638 log.debug('filling user:%s data', dbuser)
632 639
633 640 # TODO: johbo: Think about this and find a clean solution
634 641 user_data = dbuser.get_dict()
635 642 user_data.update(dbuser.get_api_data(include_secrets=True))
636 643
637 644 for k, v in user_data.iteritems():
638 645 # properties of auth user we dont update
639 646 if k not in ['auth_tokens', 'permissions']:
640 647 setattr(auth_user, k, v)
641 648
642 649 # few extras
643 650 setattr(auth_user, 'feed_token', dbuser.feed_token)
644 651 except Exception:
645 652 log.error(traceback.format_exc())
646 653 auth_user.is_authenticated = False
647 654 return False
648 655
649 656 return True
650 657
651 658 def has_perm(self, user, perm):
652 659 perm = self._get_perm(perm)
653 660 user = self._get_user(user)
654 661
655 662 return UserToPerm.query().filter(UserToPerm.user == user)\
656 663 .filter(UserToPerm.permission == perm).scalar() is not None
657 664
658 665 def grant_perm(self, user, perm):
659 666 """
660 667 Grant user global permissions
661 668
662 669 :param user:
663 670 :param perm:
664 671 """
665 672 user = self._get_user(user)
666 673 perm = self._get_perm(perm)
667 674 # if this permission is already granted skip it
668 675 _perm = UserToPerm.query()\
669 676 .filter(UserToPerm.user == user)\
670 677 .filter(UserToPerm.permission == perm)\
671 678 .scalar()
672 679 if _perm:
673 680 return
674 681 new = UserToPerm()
675 682 new.user = user
676 683 new.permission = perm
677 684 self.sa.add(new)
678 685 return new
679 686
680 687 def revoke_perm(self, user, perm):
681 688 """
682 689 Revoke users global permissions
683 690
684 691 :param user:
685 692 :param perm:
686 693 """
687 694 user = self._get_user(user)
688 695 perm = self._get_perm(perm)
689 696
690 697 obj = UserToPerm.query()\
691 698 .filter(UserToPerm.user == user)\
692 699 .filter(UserToPerm.permission == perm)\
693 700 .scalar()
694 701 if obj:
695 702 self.sa.delete(obj)
696 703
697 704 def add_extra_email(self, user, email):
698 705 """
699 706 Adds email address to UserEmailMap
700 707
701 708 :param user:
702 709 :param email:
703 710 """
704 711 from rhodecode.model import forms
705 712 form = forms.UserExtraEmailForm()()
706 713 data = form.to_python({'email': email})
707 714 user = self._get_user(user)
708 715
709 716 obj = UserEmailMap()
710 717 obj.user = user
711 718 obj.email = data['email']
712 719 self.sa.add(obj)
713 720 return obj
714 721
715 722 def delete_extra_email(self, user, email_id):
716 723 """
717 724 Removes email address from UserEmailMap
718 725
719 726 :param user:
720 727 :param email_id:
721 728 """
722 729 user = self._get_user(user)
723 730 obj = UserEmailMap.query().get(email_id)
724 731 if obj:
725 732 self.sa.delete(obj)
726 733
727 734 def parse_ip_range(self, ip_range):
728 735 ip_list = []
729 736 def make_unique(value):
730 737 seen = []
731 738 return [c for c in value if not (c in seen or seen.append(c))]
732 739
733 740 # firsts split by commas
734 741 for ip_range in ip_range.split(','):
735 742 if not ip_range:
736 743 continue
737 744 ip_range = ip_range.strip()
738 745 if '-' in ip_range:
739 746 start_ip, end_ip = ip_range.split('-', 1)
740 747 start_ip = ipaddress.ip_address(start_ip.strip())
741 748 end_ip = ipaddress.ip_address(end_ip.strip())
742 749 parsed_ip_range = []
743 750
744 751 for index in xrange(int(start_ip), int(end_ip) + 1):
745 752 new_ip = ipaddress.ip_address(index)
746 753 parsed_ip_range.append(str(new_ip))
747 754 ip_list.extend(parsed_ip_range)
748 755 else:
749 756 ip_list.append(ip_range)
750 757
751 758 return make_unique(ip_list)
752 759
753 760 def add_extra_ip(self, user, ip, description=None):
754 761 """
755 762 Adds ip address to UserIpMap
756 763
757 764 :param user:
758 765 :param ip:
759 766 """
760 767 from rhodecode.model import forms
761 768 form = forms.UserExtraIpForm()()
762 769 data = form.to_python({'ip': ip})
763 770 user = self._get_user(user)
764 771
765 772 obj = UserIpMap()
766 773 obj.user = user
767 774 obj.ip_addr = data['ip']
768 775 obj.description = description
769 776 self.sa.add(obj)
770 777 return obj
771 778
772 779 def delete_extra_ip(self, user, ip_id):
773 780 """
774 781 Removes ip address from UserIpMap
775 782
776 783 :param user:
777 784 :param ip_id:
778 785 """
779 786 user = self._get_user(user)
780 787 obj = UserIpMap.query().get(ip_id)
781 788 if obj:
782 789 self.sa.delete(obj)
783 790
784 791 def get_accounts_in_creation_order(self, current_user=None):
785 792 """
786 793 Get accounts in order of creation for deactivation for license limits
787 794
788 795 pick currently logged in user, and append to the list in position 0
789 796 pick all super-admins in order of creation date and add it to the list
790 797 pick all other accounts in order of creation and add it to the list.
791 798
792 799 Based on that list, the last accounts can be disabled as they are
793 800 created at the end and don't include any of the super admins as well
794 801 as the current user.
795 802
796 803 :param current_user: optionally current user running this operation
797 804 """
798 805
799 806 if not current_user:
800 807 current_user = get_current_rhodecode_user()
801 808 active_super_admins = [
802 809 x.user_id for x in User.query()
803 810 .filter(User.user_id != current_user.user_id)
804 811 .filter(User.active == true())
805 812 .filter(User.admin == true())
806 813 .order_by(User.created_on.asc())]
807 814
808 815 active_regular_users = [
809 816 x.user_id for x in User.query()
810 817 .filter(User.user_id != current_user.user_id)
811 818 .filter(User.active == true())
812 819 .filter(User.admin == false())
813 820 .order_by(User.created_on.asc())]
814 821
815 822 list_of_accounts = [current_user.user_id]
816 823 list_of_accounts += active_super_admins
817 824 list_of_accounts += active_regular_users
818 825
819 826 return list_of_accounts
820 827
821 828 def deactivate_last_users(self, expected_users):
822 829 """
823 830 Deactivate accounts that are over the license limits.
824 831 Algorithm of which accounts to disabled is based on the formula:
825 832
826 833 Get current user, then super admins in creation order, then regular
827 834 active users in creation order.
828 835
829 836 Using that list we mark all accounts from the end of it as inactive.
830 837 This way we block only latest created accounts.
831 838
832 839 :param expected_users: list of users in special order, we deactivate
833 840 the end N ammoun of users from that list
834 841 """
835 842
836 843 list_of_accounts = self.get_accounts_in_creation_order()
837 844
838 845 for acc_id in list_of_accounts[expected_users + 1:]:
839 846 user = User.get(acc_id)
840 847 log.info('Deactivating account %s for license unlock', user)
841 848 user.active = False
842 849 Session().add(user)
843 850 Session().commit()
844 851
845 852 return
@@ -1,31 +1,33 b''
1 1 ## -*- coding: utf-8 -*-
2 2 <%inherit file="base.mako"/>
3 3
4 4 <%def name="subject()" filter="n,trim">
5 5 RhodeCode Password reset
6 6 </%def>
7 7
8 8 ## plain text version of the email. Empty by default
9 9 <%def name="body_plaintext()" filter="n,trim">
10 10 Hi ${user.username},
11 11
12 12 There was a request to reset your password using the email address ${email} on ${h.format_date(date)}
13 13
14 14 *If you didn't do this, please contact your RhodeCode administrator.*
15 15
16 16 You can continue, and generate new password by clicking following URL:
17 17 ${password_reset_url}
18 18
19 This link will be active for 10 minutes.
19 20 ${self.plaintext_footer()}
20 21 </%def>
21 22
22 23 ## BODY GOES BELOW
23 24 <p>
24 25 Hello ${user.username},
25 26 </p><p>
26 27 There was a request to reset your password using the email address ${email} on ${h.format_date(date)}
27 28 <br/>
28 29 <strong>If you did not request a password reset, please contact your RhodeCode administrator.</strong>
29 30 </p><p>
30 31 <a href="${password_reset_url}">${_('Generate new password here')}.</a>
32 This link will be active for 10 minutes.
31 33 </p>
@@ -1,30 +1,29 b''
1 1 ## -*- coding: utf-8 -*-
2 2 <%inherit file="base.mako"/>
3 3
4 4 <%def name="subject()" filter="n,trim">
5 5 Your new RhodeCode password
6 6 </%def>
7 7
8 8 ## plain text version of the email. Empty by default
9 9 <%def name="body_plaintext()" filter="n,trim">
10 10 Hi ${user.username},
11 11
12 There was a request to reset your password using the email address ${email} on ${h.format_date(date)}
12 Below is your new access password for RhodeCode.
13 13
14 14 *If you didn't do this, please contact your RhodeCode administrator.*
15 15
16 You can continue, and generate new password by clicking following URL:
17 ${password_reset_url}
16 password: ${new_password}
18 17
19 18 ${self.plaintext_footer()}
20 19 </%def>
21 20
22 21 ## BODY GOES BELOW
23 22 <p>
24 23 Hello ${user.username},
25 24 </p><p>
26 25 Below is your new access password for RhodeCode.
27 26 <br/>
28 27 <strong>If you didn't request a new password, please contact your RhodeCode administrator.</strong>
29 28 </p>
30 <p>password: <input value='${new_password}'/></p>
29 <p>password: <pre>${new_password}</pre>
@@ -1,591 +1,510 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.config.routing import ADMIN_PREFIX
27 27 from rhodecode.tests import (
28 TestController, assert_session_flash, clear_all_caches, url,
29 HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
28 assert_session_flash, url, HG_REPO, TEST_USER_ADMIN_LOGIN)
30 29 from rhodecode.tests.fixture import Fixture
31 30 from rhodecode.tests.utils import AssertResponse, get_session_from_response
32 from rhodecode.lib.auth import check_password, generate_auth_token
33 from rhodecode.lib import helpers as h
31 from rhodecode.lib.auth import check_password
34 32 from rhodecode.model.auth_token import AuthTokenModel
35 33 from rhodecode.model import validators
36 from rhodecode.model.db import User, Notification
34 from rhodecode.model.db import User, Notification, UserApiKeys
37 35 from rhodecode.model.meta import Session
38 36
39 37 fixture = Fixture()
40 38
41 39 # Hardcode URLs because we don't have a request object to use
42 40 # pyramids URL generation methods.
43 41 index_url = '/'
44 42 login_url = ADMIN_PREFIX + '/login'
45 43 logut_url = ADMIN_PREFIX + '/logout'
46 44 register_url = ADMIN_PREFIX + '/register'
47 45 pwd_reset_url = ADMIN_PREFIX + '/password_reset'
48 46 pwd_reset_confirm_url = ADMIN_PREFIX + '/password_reset_confirmation'
49 47
50 48
51 49 @pytest.mark.usefixtures('app')
52 class TestLoginController:
50 class TestLoginController(object):
53 51 destroy_users = set()
54 52
55 53 @classmethod
56 54 def teardown_class(cls):
57 55 fixture.destroy_users(cls.destroy_users)
58 56
59 57 def teardown_method(self, method):
60 58 for n in Notification.query().all():
61 59 Session().delete(n)
62 60
63 61 Session().commit()
64 62 assert Notification.query().all() == []
65 63
66 64 def test_index(self):
67 65 response = self.app.get(login_url)
68 66 assert response.status == '200 OK'
69 67 # Test response...
70 68
71 69 def test_login_admin_ok(self):
72 70 response = self.app.post(login_url,
73 71 {'username': 'test_admin',
74 72 'password': 'test12'})
75 73 assert response.status == '302 Found'
76 74 session = get_session_from_response(response)
77 75 username = session['rhodecode_user'].get('username')
78 76 assert username == 'test_admin'
79 77 response = response.follow()
80 78 response.mustcontain('/%s' % HG_REPO)
81 79
82 80 def test_login_regular_ok(self):
83 81 response = self.app.post(login_url,
84 82 {'username': 'test_regular',
85 83 'password': 'test12'})
86 84
87 85 assert response.status == '302 Found'
88 86 session = get_session_from_response(response)
89 87 username = session['rhodecode_user'].get('username')
90 88 assert username == 'test_regular'
91 89 response = response.follow()
92 90 response.mustcontain('/%s' % HG_REPO)
93 91
94 92 def test_login_ok_came_from(self):
95 93 test_came_from = '/_admin/users?branch=stable'
96 94 _url = '{}?came_from={}'.format(login_url, test_came_from)
97 95 response = self.app.post(
98 96 _url, {'username': 'test_admin', 'password': 'test12'})
99 97 assert response.status == '302 Found'
100 98 assert 'branch=stable' in response.location
101 99 response = response.follow()
102 100
103 101 assert response.status == '200 OK'
104 102 response.mustcontain('Users administration')
105 103
106 104 def test_redirect_to_login_with_get_args(self):
107 105 with fixture.anon_access(False):
108 106 kwargs = {'branch': 'stable'}
109 107 response = self.app.get(
110 108 url('summary_home', repo_name=HG_REPO, **kwargs))
111 109 assert response.status == '302 Found'
112 110 response_query = urlparse.parse_qsl(response.location)
113 111 assert 'branch=stable' in response_query[0][1]
114 112
115 113 def test_login_form_with_get_args(self):
116 114 _url = '{}?came_from=/_admin/users,branch=stable'.format(login_url)
117 115 response = self.app.get(_url)
118 116 assert 'branch%3Dstable' in response.form.action
119 117
120 118 @pytest.mark.parametrize("url_came_from", [
121 119 'data:text/html,<script>window.alert("xss")</script>',
122 120 'mailto:test@rhodecode.org',
123 121 'file:///etc/passwd',
124 122 'ftp://some.ftp.server',
125 123 'http://other.domain',
126 124 '/\r\nX-Forwarded-Host: http://example.org',
127 125 ])
128 126 def test_login_bad_came_froms(self, url_came_from):
129 127 _url = '{}?came_from={}'.format(login_url, url_came_from)
130 128 response = self.app.post(
131 129 _url,
132 130 {'username': 'test_admin', 'password': 'test12'})
133 131 assert response.status == '302 Found'
134 132 response = response.follow()
135 133 assert response.status == '200 OK'
136 134 assert response.request.path == '/'
137 135
138 136 def test_login_short_password(self):
139 137 response = self.app.post(login_url,
140 138 {'username': 'test_admin',
141 139 'password': 'as'})
142 140 assert response.status == '200 OK'
143 141
144 142 response.mustcontain('Enter 3 characters or more')
145 143
146 144 def test_login_wrong_non_ascii_password(self, user_regular):
147 145 response = self.app.post(
148 146 login_url,
149 147 {'username': user_regular.username,
150 148 'password': u'invalid-non-asci\xe4'.encode('utf8')})
151 149
152 150 response.mustcontain('invalid user name')
153 151 response.mustcontain('invalid password')
154 152
155 153 def test_login_with_non_ascii_password(self, user_util):
156 154 password = u'valid-non-ascii\xe4'
157 155 user = user_util.create_user(password=password)
158 156 response = self.app.post(
159 157 login_url,
160 158 {'username': user.username,
161 159 'password': password.encode('utf-8')})
162 160 assert response.status_code == 302
163 161
164 162 def test_login_wrong_username_password(self):
165 163 response = self.app.post(login_url,
166 164 {'username': 'error',
167 165 'password': 'test12'})
168 166
169 167 response.mustcontain('invalid user name')
170 168 response.mustcontain('invalid password')
171 169
172 170 def test_login_admin_ok_password_migration(self, real_crypto_backend):
173 171 from rhodecode.lib import auth
174 172
175 173 # create new user, with sha256 password
176 174 temp_user = 'test_admin_sha256'
177 175 user = fixture.create_user(temp_user)
178 176 user.password = auth._RhodeCodeCryptoSha256().hash_create(
179 177 b'test123')
180 178 Session().add(user)
181 179 Session().commit()
182 180 self.destroy_users.add(temp_user)
183 181 response = self.app.post(login_url,
184 182 {'username': temp_user,
185 183 'password': 'test123'})
186 184
187 185 assert response.status == '302 Found'
188 186 session = get_session_from_response(response)
189 187 username = session['rhodecode_user'].get('username')
190 188 assert username == temp_user
191 189 response = response.follow()
192 190 response.mustcontain('/%s' % HG_REPO)
193 191
194 192 # new password should be bcrypted, after log-in and transfer
195 193 user = User.get_by_username(temp_user)
196 194 assert user.password.startswith('$')
197 195
198 196 # REGISTRATIONS
199 197 def test_register(self):
200 198 response = self.app.get(register_url)
201 199 response.mustcontain('Create an Account')
202 200
203 201 def test_register_err_same_username(self):
204 202 uname = 'test_admin'
205 203 response = self.app.post(
206 204 register_url,
207 205 {
208 206 'username': uname,
209 207 'password': 'test12',
210 208 'password_confirmation': 'test12',
211 209 'email': 'goodmail@domain.com',
212 210 'firstname': 'test',
213 211 'lastname': 'test'
214 212 }
215 213 )
216 214
217 215 assertr = AssertResponse(response)
218 216 msg = validators.ValidUsername()._messages['username_exists']
219 217 msg = msg % {'username': uname}
220 218 assertr.element_contains('#username+.error-message', msg)
221 219
222 220 def test_register_err_same_email(self):
223 221 response = self.app.post(
224 222 register_url,
225 223 {
226 224 'username': 'test_admin_0',
227 225 'password': 'test12',
228 226 'password_confirmation': 'test12',
229 227 'email': 'test_admin@mail.com',
230 228 'firstname': 'test',
231 229 'lastname': 'test'
232 230 }
233 231 )
234 232
235 233 assertr = AssertResponse(response)
236 234 msg = validators.UniqSystemEmail()()._messages['email_taken']
237 235 assertr.element_contains('#email+.error-message', msg)
238 236
239 237 def test_register_err_same_email_case_sensitive(self):
240 238 response = self.app.post(
241 239 register_url,
242 240 {
243 241 'username': 'test_admin_1',
244 242 'password': 'test12',
245 243 'password_confirmation': 'test12',
246 244 'email': 'TesT_Admin@mail.COM',
247 245 'firstname': 'test',
248 246 'lastname': 'test'
249 247 }
250 248 )
251 249 assertr = AssertResponse(response)
252 250 msg = validators.UniqSystemEmail()()._messages['email_taken']
253 251 assertr.element_contains('#email+.error-message', msg)
254 252
255 253 def test_register_err_wrong_data(self):
256 254 response = self.app.post(
257 255 register_url,
258 256 {
259 257 'username': 'xs',
260 258 'password': 'test',
261 259 'password_confirmation': 'test',
262 260 'email': 'goodmailm',
263 261 'firstname': 'test',
264 262 'lastname': 'test'
265 263 }
266 264 )
267 265 assert response.status == '200 OK'
268 266 response.mustcontain('An email address must contain a single @')
269 267 response.mustcontain('Enter a value 6 characters long or more')
270 268
271 269 def test_register_err_username(self):
272 270 response = self.app.post(
273 271 register_url,
274 272 {
275 273 'username': 'error user',
276 274 'password': 'test12',
277 275 'password_confirmation': 'test12',
278 276 'email': 'goodmailm',
279 277 'firstname': 'test',
280 278 'lastname': 'test'
281 279 }
282 280 )
283 281
284 282 response.mustcontain('An email address must contain a single @')
285 283 response.mustcontain(
286 284 'Username may only contain '
287 285 'alphanumeric characters underscores, '
288 286 'periods or dashes and must begin with '
289 287 'alphanumeric character')
290 288
291 289 def test_register_err_case_sensitive(self):
292 290 usr = 'Test_Admin'
293 291 response = self.app.post(
294 292 register_url,
295 293 {
296 294 'username': usr,
297 295 'password': 'test12',
298 296 'password_confirmation': 'test12',
299 297 'email': 'goodmailm',
300 298 'firstname': 'test',
301 299 'lastname': 'test'
302 300 }
303 301 )
304 302
305 303 assertr = AssertResponse(response)
306 304 msg = validators.ValidUsername()._messages['username_exists']
307 305 msg = msg % {'username': usr}
308 306 assertr.element_contains('#username+.error-message', msg)
309 307
310 308 def test_register_special_chars(self):
311 309 response = self.app.post(
312 310 register_url,
313 311 {
314 312 'username': 'xxxaxn',
315 313 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
316 314 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
317 315 'email': 'goodmailm@test.plx',
318 316 'firstname': 'test',
319 317 'lastname': 'test'
320 318 }
321 319 )
322 320
323 321 msg = validators.ValidPassword()._messages['invalid_password']
324 322 response.mustcontain(msg)
325 323
326 324 def test_register_password_mismatch(self):
327 325 response = self.app.post(
328 326 register_url,
329 327 {
330 328 'username': 'xs',
331 329 'password': '123qwe',
332 330 'password_confirmation': 'qwe123',
333 331 'email': 'goodmailm@test.plxa',
334 332 'firstname': 'test',
335 333 'lastname': 'test'
336 334 }
337 335 )
338 336 msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
339 337 response.mustcontain(msg)
340 338
341 339 def test_register_ok(self):
342 340 username = 'test_regular4'
343 341 password = 'qweqwe'
344 342 email = 'marcin@test.com'
345 343 name = 'testname'
346 344 lastname = 'testlastname'
347 345
348 346 response = self.app.post(
349 347 register_url,
350 348 {
351 349 'username': username,
352 350 'password': password,
353 351 'password_confirmation': password,
354 352 'email': email,
355 353 'firstname': name,
356 354 'lastname': lastname,
357 355 'admin': True
358 356 }
359 357 ) # This should be overriden
360 358 assert response.status == '302 Found'
361 359 assert_session_flash(
362 360 response, 'You have successfully registered with RhodeCode')
363 361
364 362 ret = Session().query(User).filter(
365 363 User.username == 'test_regular4').one()
366 364 assert ret.username == username
367 365 assert check_password(password, ret.password)
368 366 assert ret.email == email
369 367 assert ret.name == name
370 368 assert ret.lastname == lastname
371 369 assert ret.api_key is not None
372 370 assert not ret.admin
373 371
374 372 def test_forgot_password_wrong_mail(self):
375 373 bad_email = 'marcin@wrongmail.org'
376 374 response = self.app.post(
377 pwd_reset_url,
378 {'email': bad_email, }
375 pwd_reset_url, {'email': bad_email, }
379 376 )
377 assert_session_flash(response,
378 'If such email exists, a password reset link was sent to it.')
380 379
381 msg = validators.ValidSystemEmail()._messages['non_existing_email']
382 msg = h.html_escape(msg % {'email': bad_email})
383 response.mustcontain()
384
385 def test_forgot_password(self):
380 def test_forgot_password(self, user_util):
386 381 response = self.app.get(pwd_reset_url)
387 382 assert response.status == '200 OK'
388 383
389 username = 'test_password_reset_1'
390 password = 'qweqwe'
391 email = 'marcin@python-works.com'
392 name = 'passwd'
393 lastname = 'reset'
384 user = user_util.create_user()
385 user_id = user.user_id
386 email = user.email
394 387
395 new = User()
396 new.username = username
397 new.password = password
398 new.email = email
399 new.name = name
400 new.lastname = lastname
401 new.api_key = generate_auth_token(username)
402 Session().add(new)
403 Session().commit()
388 response = self.app.post(pwd_reset_url, {'email': email, })
404 389
405 response = self.app.post(pwd_reset_url,
406 {'email': email, })
407
408 assert_session_flash(
409 response, 'Your password reset link was sent')
410
411 response = response.follow()
390 assert_session_flash(response,
391 'If such email exists, a password reset link was sent to it.')
412 392
413 393 # BAD KEY
414
415 key = "bad"
416 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key)
394 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, 'badkey')
417 395 response = self.app.get(confirm_url)
418 396 assert response.status == '302 Found'
419 397 assert response.location.endswith(pwd_reset_url)
398 assert_session_flash(response, 'Given reset token is invalid')
399
400 response.follow() # cleanup flash
420 401
421 402 # GOOD KEY
403 key = UserApiKeys.query()\
404 .filter(UserApiKeys.user_id == user_id)\
405 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
406 .first()
422 407
423 key = User.get_by_username(username).api_key
424 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key)
408 assert key
409
410 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key.api_key)
425 411 response = self.app.get(confirm_url)
426 412 assert response.status == '302 Found'
427 413 assert response.location.endswith(login_url)
428 414
429 415 assert_session_flash(
430 416 response,
431 417 'Your password reset was successful, '
432 418 'a new password has been sent to your email')
433 419
434 response = response.follow()
420 response.follow()
435 421
436 422 def _get_api_whitelist(self, values=None):
437 423 config = {'api_access_controllers_whitelist': values or []}
438 424 return config
439 425
440 426 @pytest.mark.parametrize("test_name, auth_token", [
441 427 ('none', None),
442 428 ('empty_string', ''),
443 429 ('fake_number', '123456'),
444 430 ('proper_auth_token', None)
445 431 ])
446 432 def test_access_not_whitelisted_page_via_auth_token(
447 433 self, test_name, auth_token, user_admin):
448 434
449 435 whitelist = self._get_api_whitelist([])
450 436 with mock.patch.dict('rhodecode.CONFIG', whitelist):
451 437 assert [] == whitelist['api_access_controllers_whitelist']
452 438 if test_name == 'proper_auth_token':
453 439 # use builtin if api_key is None
454 440 auth_token = user_admin.api_key
455 441
456 442 with fixture.anon_access(False):
457 443 self.app.get(url(controller='changeset',
458 444 action='changeset_raw',
459 445 repo_name=HG_REPO, revision='tip',
460 446 api_key=auth_token),
461 447 status=302)
462 448
463 449 @pytest.mark.parametrize("test_name, auth_token, code", [
464 450 ('none', None, 302),
465 451 ('empty_string', '', 302),
466 452 ('fake_number', '123456', 302),
467 453 ('proper_auth_token', None, 200)
468 454 ])
469 455 def test_access_whitelisted_page_via_auth_token(
470 456 self, test_name, auth_token, code, user_admin):
471 457
472 458 whitelist_entry = ['ChangesetController:changeset_raw']
473 459 whitelist = self._get_api_whitelist(whitelist_entry)
474 460
475 461 with mock.patch.dict('rhodecode.CONFIG', whitelist):
476 462 assert whitelist_entry == whitelist['api_access_controllers_whitelist']
477 463
478 464 if test_name == 'proper_auth_token':
479 465 auth_token = user_admin.api_key
480 466
481 467 with fixture.anon_access(False):
482 468 self.app.get(url(controller='changeset',
483 469 action='changeset_raw',
484 470 repo_name=HG_REPO, revision='tip',
485 471 api_key=auth_token),
486 472 status=code)
487 473
488 474 def test_access_page_via_extra_auth_token(self):
489 475 whitelist = self._get_api_whitelist(
490 476 ['ChangesetController:changeset_raw'])
491 477 with mock.patch.dict('rhodecode.CONFIG', whitelist):
492 478 assert ['ChangesetController:changeset_raw'] == \
493 479 whitelist['api_access_controllers_whitelist']
494 480
495 481 new_auth_token = AuthTokenModel().create(
496 482 TEST_USER_ADMIN_LOGIN, 'test')
497 483 Session().commit()
498 484 with fixture.anon_access(False):
499 485 self.app.get(url(controller='changeset',
500 486 action='changeset_raw',
501 487 repo_name=HG_REPO, revision='tip',
502 488 api_key=new_auth_token.api_key),
503 489 status=200)
504 490
505 491 def test_access_page_via_expired_auth_token(self):
506 492 whitelist = self._get_api_whitelist(
507 493 ['ChangesetController:changeset_raw'])
508 494 with mock.patch.dict('rhodecode.CONFIG', whitelist):
509 495 assert ['ChangesetController:changeset_raw'] == \
510 496 whitelist['api_access_controllers_whitelist']
511 497
512 498 new_auth_token = AuthTokenModel().create(
513 499 TEST_USER_ADMIN_LOGIN, 'test')
514 500 Session().commit()
515 501 # patch the api key and make it expired
516 502 new_auth_token.expires = 0
517 503 Session().add(new_auth_token)
518 504 Session().commit()
519 505 with fixture.anon_access(False):
520 506 self.app.get(url(controller='changeset',
521 507 action='changeset_raw',
522 508 repo_name=HG_REPO, revision='tip',
523 509 api_key=new_auth_token.api_key),
524 510 status=302)
525
526
527 class TestPasswordReset(TestController):
528
529 @pytest.mark.parametrize(
530 'pwd_reset_setting, show_link, show_reset', [
531 ('hg.password_reset.enabled', True, True),
532 ('hg.password_reset.hidden', False, True),
533 ('hg.password_reset.disabled', False, False),
534 ])
535 def test_password_reset_settings(
536 self, pwd_reset_setting, show_link, show_reset):
537 clear_all_caches()
538 self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
539 params = {
540 'csrf_token': self.csrf_token,
541 'anonymous': 'True',
542 'default_register': 'hg.register.auto_activate',
543 'default_register_message': '',
544 'default_password_reset': pwd_reset_setting,
545 'default_extern_activate': 'hg.extern_activate.auto',
546 }
547 resp = self.app.post(url('admin_permissions_application'), params=params)
548 self.logout_user()
549
550 login_page = self.app.get(login_url)
551 asr_login = AssertResponse(login_page)
552 index_page = self.app.get(index_url)
553 asr_index = AssertResponse(index_page)
554
555 if show_link:
556 asr_login.one_element_exists('a.pwd_reset')
557 asr_index.one_element_exists('a.pwd_reset')
558 else:
559 asr_login.no_element_exists('a.pwd_reset')
560 asr_index.no_element_exists('a.pwd_reset')
561
562 pwdreset_page = self.app.get(pwd_reset_url)
563
564 asr_reset = AssertResponse(pwdreset_page)
565 if show_reset:
566 assert 'Send password reset email' in pwdreset_page
567 asr_reset.one_element_exists('#email')
568 asr_reset.one_element_exists('#send')
569 else:
570 assert 'Password reset is disabled.' in pwdreset_page
571 asr_reset.no_element_exists('#email')
572 asr_reset.no_element_exists('#send')
573
574 def test_password_form_disabled(self):
575 self.log_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
576 params = {
577 'csrf_token': self.csrf_token,
578 'anonymous': 'True',
579 'default_register': 'hg.register.auto_activate',
580 'default_register_message': '',
581 'default_password_reset': 'hg.password_reset.disabled',
582 'default_extern_activate': 'hg.extern_activate.auto',
583 }
584 self.app.post(url('admin_permissions_application'), params=params)
585 self.logout_user()
586
587 pwdreset_page = self.app.post(
588 pwd_reset_url,
589 {'email': 'lisa@rhodecode.com',}
590 )
591 assert 'Password reset is disabled.' in pwdreset_page
General Comments 0
You need to be logged in to leave comments. Login now