##// END OF EJS Templates
login: simplify came_from validation...
Søren Løvborg -
r5510:a0a9ae75 stable
parent child Browse files
Show More
@@ -1,270 +1,257 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.controllers.login
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 Login controller for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Apr 22, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28
29 29 import logging
30 30 import formencode
31 31 import urlparse
32 32
33 33 from formencode import htmlfill
34 34 from webob.exc import HTTPFound, HTTPBadRequest
35 35 from pylons.i18n.translation import _
36 36 from pylons.controllers.util import redirect
37 37 from pylons import request, session, tmpl_context as c, url
38 38
39 39 import kallithea.lib.helpers as h
40 40 from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator
41 41 from kallithea.lib.base import BaseController, log_in_user, render
42 42 from kallithea.lib.exceptions import UserCreationError
43 43 from kallithea.lib.utils2 import safe_str
44 44 from kallithea.model.db import User, Setting
45 45 from kallithea.model.forms import \
46 46 LoginForm, RegisterForm, PasswordResetRequestForm, PasswordResetConfirmationForm
47 47 from kallithea.model.user import UserModel
48 48 from kallithea.model.meta import Session
49 49
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53
54 54 class LoginController(BaseController):
55 55
56 56 def __before__(self):
57 57 super(LoginController, self).__before__()
58 58
59 59 def _validate_came_from(self, came_from):
60 60 """Return True if came_from is valid and can and should be used"""
61 if not came_from:
62 return False
63
64 parsed = urlparse.urlparse(came_from)
65 server_parsed = urlparse.urlparse(url.current())
66 allowed_schemes = ['http', 'https']
67 if parsed.scheme and parsed.scheme not in allowed_schemes:
68 log.error('Suspicious URL scheme detected %s for url %s',
69 parsed.scheme, parsed)
70 return False
71 if server_parsed.netloc != parsed.netloc:
72 log.error('Suspicious NETLOC detected %s for url %s server url '
73 'is: %s' % (parsed.netloc, parsed, server_parsed))
74 return False
75 return True
61 url = urlparse.urlsplit(came_from)
62 return not url.scheme and not url.netloc
76 63
77 64 def index(self):
78 65 c.came_from = safe_str(request.GET.pop('came_from', ''))
79 66 if c.came_from:
80 67 if not self._validate_came_from(c.came_from):
81 68 log.error('Invalid came_from (not server-relative): %r', c.came_from)
82 69 raise HTTPBadRequest()
83 70 came_from = url(c.came_from, **request.GET)
84 71 else:
85 72 c.came_from = came_from = url('home')
86 73
87 74 not_default = self.authuser.username != User.DEFAULT_USER
88 75 ip_allowed = AuthUser.check_ip_allowed(self.authuser, self.ip_addr)
89 76
90 77 # redirect if already logged in
91 78 if self.authuser.is_authenticated and not_default and ip_allowed:
92 79 raise HTTPFound(location=came_from)
93 80
94 81 if request.POST:
95 82 # import Login Form validator class
96 83 login_form = LoginForm()
97 84 try:
98 85 c.form_result = login_form.to_python(dict(request.POST))
99 86 # form checks for username/password, now we're authenticated
100 87 username = c.form_result['username']
101 88 user = User.get_by_username(username, case_insensitive=True)
102 89 except formencode.Invalid as errors:
103 90 defaults = errors.value
104 91 # remove password from filling in form again
105 92 del defaults['password']
106 93 return htmlfill.render(
107 94 render('/login.html'),
108 95 defaults=errors.value,
109 96 errors=errors.error_dict or {},
110 97 prefix_error=False,
111 98 encoding="UTF-8",
112 99 force_defaults=False)
113 100 except UserCreationError as e:
114 101 # container auth or other auth functions that create users on
115 102 # the fly can throw this exception signaling that there's issue
116 103 # with user creation, explanation should be provided in
117 104 # Exception itself
118 105 h.flash(e, 'error')
119 106 else:
120 107 log_in_user(user, c.form_result['remember'],
121 108 is_external_auth=False)
122 109 raise HTTPFound(location=came_from)
123 110
124 111 return render('/login.html')
125 112
126 113 @HasPermissionAnyDecorator('hg.admin', 'hg.register.auto_activate',
127 114 'hg.register.manual_activate')
128 115 def register(self):
129 116 c.auto_active = 'hg.register.auto_activate' in User.get_default_user()\
130 117 .AuthUser.permissions['global']
131 118
132 119 settings = Setting.get_app_settings()
133 120 captcha_private_key = settings.get('captcha_private_key')
134 121 c.captcha_active = bool(captcha_private_key)
135 122 c.captcha_public_key = settings.get('captcha_public_key')
136 123
137 124 if request.POST:
138 125 register_form = RegisterForm()()
139 126 try:
140 127 form_result = register_form.to_python(dict(request.POST))
141 128 form_result['active'] = c.auto_active
142 129
143 130 if c.captcha_active:
144 131 from kallithea.lib.recaptcha import submit
145 132 response = submit(request.POST.get('recaptcha_challenge_field'),
146 133 request.POST.get('recaptcha_response_field'),
147 134 private_key=captcha_private_key,
148 135 remoteip=self.ip_addr)
149 136 if c.captcha_active and not response.is_valid:
150 137 _value = form_result
151 138 _msg = _('Bad captcha')
152 139 error_dict = {'recaptcha_field': _msg}
153 140 raise formencode.Invalid(_msg, _value, None,
154 141 error_dict=error_dict)
155 142
156 143 UserModel().create_registration(form_result)
157 144 h.flash(_('You have successfully registered into Kallithea'),
158 145 category='success')
159 146 Session().commit()
160 147 return redirect(url('login_home'))
161 148
162 149 except formencode.Invalid as errors:
163 150 return htmlfill.render(
164 151 render('/register.html'),
165 152 defaults=errors.value,
166 153 errors=errors.error_dict or {},
167 154 prefix_error=False,
168 155 encoding="UTF-8",
169 156 force_defaults=False)
170 157 except UserCreationError as e:
171 158 # container auth or other auth functions that create users on
172 159 # the fly can throw this exception signaling that there's issue
173 160 # with user creation, explanation should be provided in
174 161 # Exception itself
175 162 h.flash(e, 'error')
176 163
177 164 return render('/register.html')
178 165
179 166 def password_reset(self):
180 167 settings = Setting.get_app_settings()
181 168 captcha_private_key = settings.get('captcha_private_key')
182 169 c.captcha_active = bool(captcha_private_key)
183 170 c.captcha_public_key = settings.get('captcha_public_key')
184 171
185 172 if request.POST:
186 173 password_reset_form = PasswordResetRequestForm()()
187 174 try:
188 175 form_result = password_reset_form.to_python(dict(request.POST))
189 176 if c.captcha_active:
190 177 from kallithea.lib.recaptcha import submit
191 178 response = submit(request.POST.get('recaptcha_challenge_field'),
192 179 request.POST.get('recaptcha_response_field'),
193 180 private_key=captcha_private_key,
194 181 remoteip=self.ip_addr)
195 182 if c.captcha_active and not response.is_valid:
196 183 _value = form_result
197 184 _msg = _('Bad captcha')
198 185 error_dict = {'recaptcha_field': _msg}
199 186 raise formencode.Invalid(_msg, _value, None,
200 187 error_dict=error_dict)
201 188 redirect_link = UserModel().send_reset_password_email(form_result)
202 189 h.flash(_('A password reset confirmation code has been sent'),
203 190 category='success')
204 191 return redirect(redirect_link)
205 192
206 193 except formencode.Invalid as errors:
207 194 return htmlfill.render(
208 195 render('/password_reset.html'),
209 196 defaults=errors.value,
210 197 errors=errors.error_dict or {},
211 198 prefix_error=False,
212 199 encoding="UTF-8",
213 200 force_defaults=False)
214 201
215 202 return render('/password_reset.html')
216 203
217 204 def password_reset_confirmation(self):
218 205 # This controller handles both GET and POST requests, though we
219 206 # only ever perform the actual password change on POST (since
220 207 # GET requests are not allowed to have side effects, and do not
221 208 # receive automatic CSRF protection).
222 209
223 210 # The template needs the email address outside of the form.
224 211 c.email = request.params.get('email')
225 212
226 213 if not request.POST:
227 214 return htmlfill.render(
228 215 render('/password_reset_confirmation.html'),
229 216 defaults=dict(request.params),
230 217 encoding='UTF-8')
231 218
232 219 form = PasswordResetConfirmationForm()()
233 220 try:
234 221 form_result = form.to_python(dict(request.POST))
235 222 except formencode.Invalid as errors:
236 223 return htmlfill.render(
237 224 render('/password_reset_confirmation.html'),
238 225 defaults=errors.value,
239 226 errors=errors.error_dict or {},
240 227 prefix_error=False,
241 228 encoding='UTF-8')
242 229
243 230 if not UserModel().verify_reset_password_token(
244 231 form_result['email'],
245 232 form_result['timestamp'],
246 233 form_result['token'],
247 234 ):
248 235 return htmlfill.render(
249 236 render('/password_reset_confirmation.html'),
250 237 defaults=form_result,
251 238 errors={'token': _('Invalid password reset token')},
252 239 prefix_error=False,
253 240 encoding='UTF-8')
254 241
255 242 UserModel().reset_password(form_result['email'], form_result['password'])
256 243 h.flash(_('Successfully updated password'), category='success')
257 244 return redirect(url('login_home'))
258 245
259 246 def logout(self):
260 247 session.delete()
261 248 log.info('Logging out and deleting session for user')
262 249 redirect(url('home'))
263 250
264 251 def authentication_token(self):
265 252 """Return the CSRF protection token for the session - just like it
266 253 could have been screen scraped from a page with a form.
267 254 Only intended for testing but might also be useful for other kinds
268 255 of automation.
269 256 """
270 257 return h.authentication_token()
@@ -1,500 +1,496 b''
1 1 # -*- coding: utf-8 -*-
2 2 import re
3 3 import time
4 4
5 5 import mock
6 6
7 7 from kallithea.tests import *
8 8 from kallithea.tests.fixture import Fixture
9 9 from kallithea.lib.utils2 import generate_api_key
10 10 from kallithea.lib.auth import check_password
11 11 from kallithea.lib import helpers as h
12 12 from kallithea.model.api_key import ApiKeyModel
13 13 from kallithea.model import validators
14 14 from kallithea.model.db import User, Notification
15 15 from kallithea.model.meta import Session
16 16 from kallithea.model.user import UserModel
17 17
18 18 fixture = Fixture()
19 19
20 20
21 21 class TestLoginController(TestController):
22 22 def setUp(self):
23 23 self.remove_all_notifications()
24 24 self.assertEqual(Notification.query().all(), [])
25 25
26 26 def test_index(self):
27 27 response = self.app.get(url(controller='login', action='index'))
28 28 self.assertEqual(response.status, '200 OK')
29 29 # Test response...
30 30
31 31 def test_login_admin_ok(self):
32 32 response = self.app.post(url(controller='login', action='index'),
33 33 {'username': TEST_USER_ADMIN_LOGIN,
34 34 'password': TEST_USER_ADMIN_PASS})
35 35 self.assertEqual(response.status, '302 Found')
36 36 self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
37 37
38 38 response = response.follow()
39 39 response.mustcontain('/%s' % HG_REPO)
40 40
41 41 def test_login_regular_ok(self):
42 42 response = self.app.post(url(controller='login', action='index'),
43 43 {'username': TEST_USER_REGULAR_LOGIN,
44 44 'password': TEST_USER_REGULAR_PASS})
45 45
46 46 self.assertEqual(response.status, '302 Found')
47 47 self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
48 48
49 49 response = response.follow()
50 50 response.mustcontain('/%s' % HG_REPO)
51 51
52 52 def test_login_ok_came_from(self):
53 53 test_came_from = '/_admin/users'
54 54 response = self.app.post(url(controller='login', action='index',
55 55 came_from=test_came_from),
56 56 {'username': TEST_USER_ADMIN_LOGIN,
57 57 'password': TEST_USER_ADMIN_PASS})
58 58 self.assertEqual(response.status, '302 Found')
59 59 response = response.follow()
60 60
61 61 self.assertEqual(response.status, '200 OK')
62 62 response.mustcontain('Users Administration')
63 63
64 64 def test_login_do_not_remember(self):
65 65 response = self.app.post(url(controller='login', action='index'),
66 66 {'username': TEST_USER_REGULAR_LOGIN,
67 67 'password': TEST_USER_REGULAR_PASS,
68 68 'remember': False})
69 69
70 70 self.assertIn('Set-Cookie', response.headers)
71 71 for cookie in response.headers.getall('Set-Cookie'):
72 72 self.assertFalse(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
73 73 'Cookie %r has expiration date, but should be a session cookie' % cookie)
74 74
75 75 def test_login_remember(self):
76 76 response = self.app.post(url(controller='login', action='index'),
77 77 {'username': TEST_USER_REGULAR_LOGIN,
78 78 'password': TEST_USER_REGULAR_PASS,
79 79 'remember': True})
80 80
81 81 self.assertIn('Set-Cookie', response.headers)
82 82 for cookie in response.headers.getall('Set-Cookie'):
83 83 self.assertTrue(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
84 84 'Cookie %r should have expiration date, but is a session cookie' % cookie)
85 85
86 86 def test_logout(self):
87 87 response = self.app.post(url(controller='login', action='index'),
88 88 {'username': TEST_USER_REGULAR_LOGIN,
89 89 'password': TEST_USER_REGULAR_PASS})
90 90
91 91 # Verify that a login session has been established.
92 92 response = self.app.get(url(controller='login', action='index'))
93 93 response = response.follow()
94 94 self.assertIn('authuser', response.session)
95 95
96 96 response.click('Log Out')
97 97
98 98 # Verify that the login session has been terminated.
99 99 response = self.app.get(url(controller='login', action='index'))
100 100 self.assertNotIn('authuser', response.session)
101 101
102 102 @parameterized.expand([
103 103 ('data:text/html,<script>window.alert("xss")</script>',),
104 104 ('mailto:test@example.com',),
105 105 ('file:///etc/passwd',),
106 106 ('ftp://ftp.example.com',),
107 107 ('http://other.example.com/bl%C3%A5b%C3%A6rgr%C3%B8d',),
108 ('//evil.example.com/',),
108 109 ])
109 110 def test_login_bad_came_froms(self, url_came_from):
110 111 response = self.app.post(url(controller='login', action='index',
111 112 came_from=url_came_from),
112 113 {'username': TEST_USER_ADMIN_LOGIN,
113 'password': TEST_USER_ADMIN_PASS})
114 self.assertEqual(response.status, '302 Found')
115 self.assertEqual(response._environ['paste.testing_variables']
116 ['tmpl_context'].came_from, '/')
117 response = response.follow()
118
119 self.assertEqual(response.status, '200 OK')
114 'password': TEST_USER_ADMIN_PASS},
115 status=400)
120 116
121 117 def test_login_short_password(self):
122 118 response = self.app.post(url(controller='login', action='index'),
123 119 {'username': TEST_USER_ADMIN_LOGIN,
124 120 'password': 'as'})
125 121 self.assertEqual(response.status, '200 OK')
126 122
127 123 response.mustcontain('Enter 3 characters or more')
128 124
129 125 def test_login_wrong_username_password(self):
130 126 response = self.app.post(url(controller='login', action='index'),
131 127 {'username': 'error',
132 128 'password': 'test12'})
133 129
134 130 response.mustcontain('Invalid username or password')
135 131
136 132 # verify that get arguments are correctly passed along login redirection
137 133
138 134 @parameterized.expand([
139 135 ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
140 136 ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
141 137 ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
142 138 ])
143 139 def test_redirection_to_login_form_preserves_get_args(self, args, args_encoded):
144 140 with fixture.anon_access(False):
145 141 response = self.app.get(url(controller='summary', action='index',
146 142 repo_name=HG_REPO,
147 143 **args))
148 144 self.assertEqual(response.status, '302 Found')
149 145 for encoded in args_encoded:
150 146 self.assertIn(encoded, response.location)
151 147
152 148 @parameterized.expand([
153 149 ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
154 150 ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
155 151 ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
156 152 ])
157 153 def test_login_form_preserves_get_args(self, args, args_encoded):
158 154 response = self.app.get(url(controller='login', action='index',
159 155 came_from = '/_admin/users',
160 156 **args))
161 157 for encoded in args_encoded:
162 158 self.assertIn(encoded, response.form.action)
163 159
164 160 @parameterized.expand([
165 161 ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
166 162 ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
167 163 ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
168 164 ])
169 165 def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
170 166 response = self.app.post(url(controller='login', action='index',
171 167 came_from = '/_admin/users',
172 168 **args),
173 169 {'username': TEST_USER_ADMIN_LOGIN,
174 170 'password': TEST_USER_ADMIN_PASS})
175 171 self.assertEqual(response.status, '302 Found')
176 172 for encoded in args_encoded:
177 173 self.assertIn(encoded, response.location)
178 174
179 175 @parameterized.expand([
180 176 ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
181 177 ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
182 178 ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
183 179 ])
184 180 def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
185 181 response = self.app.post(url(controller='login', action='index',
186 182 came_from = '/_admin/users',
187 183 **args),
188 184 {'username': 'error',
189 185 'password': 'test12'})
190 186
191 187 response.mustcontain('Invalid username or password')
192 188 for encoded in args_encoded:
193 189 self.assertIn(encoded, response.form.action)
194 190
195 191 #==========================================================================
196 192 # REGISTRATIONS
197 193 #==========================================================================
198 194 def test_register(self):
199 195 response = self.app.get(url(controller='login', action='register'))
200 196 response.mustcontain('Sign Up')
201 197
202 198 def test_register_err_same_username(self):
203 199 uname = TEST_USER_ADMIN_LOGIN
204 200 response = self.app.post(url(controller='login', action='register'),
205 201 {'username': uname,
206 202 'password': 'test12',
207 203 'password_confirmation': 'test12',
208 204 'email': 'goodmail@example.com',
209 205 'firstname': 'test',
210 206 'lastname': 'test'})
211 207
212 208 msg = validators.ValidUsername()._messages['username_exists']
213 209 msg = h.html_escape(msg % {'username': uname})
214 210 response.mustcontain(msg)
215 211
216 212 def test_register_err_same_email(self):
217 213 response = self.app.post(url(controller='login', action='register'),
218 214 {'username': 'test_admin_0',
219 215 'password': 'test12',
220 216 'password_confirmation': 'test12',
221 217 'email': TEST_USER_ADMIN_EMAIL,
222 218 'firstname': 'test',
223 219 'lastname': 'test'})
224 220
225 221 msg = validators.UniqSystemEmail()()._messages['email_taken']
226 222 response.mustcontain(msg)
227 223
228 224 def test_register_err_same_email_case_sensitive(self):
229 225 response = self.app.post(url(controller='login', action='register'),
230 226 {'username': 'test_admin_1',
231 227 'password': 'test12',
232 228 'password_confirmation': 'test12',
233 229 'email': TEST_USER_ADMIN_EMAIL.title(),
234 230 'firstname': 'test',
235 231 'lastname': 'test'})
236 232 msg = validators.UniqSystemEmail()()._messages['email_taken']
237 233 response.mustcontain(msg)
238 234
239 235 def test_register_err_wrong_data(self):
240 236 response = self.app.post(url(controller='login', action='register'),
241 237 {'username': 'xs',
242 238 'password': 'test',
243 239 'password_confirmation': 'test',
244 240 'email': 'goodmailm',
245 241 'firstname': 'test',
246 242 'lastname': 'test'})
247 243 self.assertEqual(response.status, '200 OK')
248 244 response.mustcontain('An email address must contain a single @')
249 245 response.mustcontain('Enter a value 6 characters long or more')
250 246
251 247 def test_register_err_username(self):
252 248 response = self.app.post(url(controller='login', action='register'),
253 249 {'username': 'error user',
254 250 'password': 'test12',
255 251 'password_confirmation': 'test12',
256 252 'email': 'goodmailm',
257 253 'firstname': 'test',
258 254 'lastname': 'test'})
259 255
260 256 response.mustcontain('An email address must contain a single @')
261 257 response.mustcontain('Username may only contain '
262 258 'alphanumeric characters underscores, '
263 259 'periods or dashes and must begin with an '
264 260 'alphanumeric character')
265 261
266 262 def test_register_err_case_sensitive(self):
267 263 usr = TEST_USER_ADMIN_LOGIN.title()
268 264 response = self.app.post(url(controller='login', action='register'),
269 265 {'username': usr,
270 266 'password': 'test12',
271 267 'password_confirmation': 'test12',
272 268 'email': 'goodmailm',
273 269 'firstname': 'test',
274 270 'lastname': 'test'})
275 271
276 272 response.mustcontain('An email address must contain a single @')
277 273 msg = validators.ValidUsername()._messages['username_exists']
278 274 msg = h.html_escape(msg % {'username': usr})
279 275 response.mustcontain(msg)
280 276
281 277 def test_register_special_chars(self):
282 278 response = self.app.post(url(controller='login', action='register'),
283 279 {'username': 'xxxaxn',
284 280 'password': 'ąćźżąśśśś',
285 281 'password_confirmation': 'ąćźżąśśśś',
286 282 'email': 'goodmailm@test.plx',
287 283 'firstname': 'test',
288 284 'lastname': 'test'})
289 285
290 286 msg = validators.ValidPassword()._messages['invalid_password']
291 287 response.mustcontain(msg)
292 288
293 289 def test_register_password_mismatch(self):
294 290 response = self.app.post(url(controller='login', action='register'),
295 291 {'username': 'xs',
296 292 'password': '123qwe',
297 293 'password_confirmation': 'qwe123',
298 294 'email': 'goodmailm@test.plxa',
299 295 'firstname': 'test',
300 296 'lastname': 'test'})
301 297 msg = validators.ValidPasswordsMatch('password', 'password_confirmation')._messages['password_mismatch']
302 298 response.mustcontain(msg)
303 299
304 300 def test_register_ok(self):
305 301 username = 'test_regular4'
306 302 password = 'qweqwe'
307 303 email = 'user4@example.com'
308 304 name = 'testname'
309 305 lastname = 'testlastname'
310 306
311 307 response = self.app.post(url(controller='login', action='register'),
312 308 {'username': username,
313 309 'password': password,
314 310 'password_confirmation': password,
315 311 'email': email,
316 312 'firstname': name,
317 313 'lastname': lastname,
318 314 'admin': True}) # This should be overriden
319 315 self.assertEqual(response.status, '302 Found')
320 316 self.checkSessionFlash(response, 'You have successfully registered into Kallithea')
321 317
322 318 ret = Session().query(User).filter(User.username == 'test_regular4').one()
323 319 self.assertEqual(ret.username, username)
324 320 self.assertEqual(check_password(password, ret.password), True)
325 321 self.assertEqual(ret.email, email)
326 322 self.assertEqual(ret.name, name)
327 323 self.assertEqual(ret.lastname, lastname)
328 324 self.assertNotEqual(ret.api_key, None)
329 325 self.assertEqual(ret.admin, False)
330 326
331 327 #==========================================================================
332 328 # PASSWORD RESET
333 329 #==========================================================================
334 330
335 331 def test_forgot_password_wrong_mail(self):
336 332 bad_email = 'username%wrongmail.org'
337 333 response = self.app.post(
338 334 url(controller='login', action='password_reset'),
339 335 {'email': bad_email, }
340 336 )
341 337
342 338 response.mustcontain('An email address must contain a single @')
343 339
344 340 def test_forgot_password(self):
345 341 response = self.app.get(url(controller='login',
346 342 action='password_reset'))
347 343 self.assertEqual(response.status, '200 OK')
348 344
349 345 username = 'test_password_reset_1'
350 346 password = 'qweqwe'
351 347 email = 'username@example.com'
352 348 name = 'passwd'
353 349 lastname = 'reset'
354 350 timestamp = int(time.time())
355 351
356 352 new = User()
357 353 new.username = username
358 354 new.password = password
359 355 new.email = email
360 356 new.name = name
361 357 new.lastname = lastname
362 358 new.api_key = generate_api_key()
363 359 Session().add(new)
364 360 Session().commit()
365 361
366 362 response = self.app.post(url(controller='login',
367 363 action='password_reset'),
368 364 {'email': email, })
369 365
370 366 self.checkSessionFlash(response, 'A password reset confirmation code has been sent')
371 367
372 368 response = response.follow()
373 369
374 370 # BAD TOKEN
375 371
376 372 token = "bad"
377 373
378 374 response = self.app.post(url(controller='login',
379 375 action='password_reset_confirmation'),
380 376 {'email': email,
381 377 'timestamp': timestamp,
382 378 'password': "p@ssw0rd",
383 379 'password_confirm': "p@ssw0rd",
384 380 'token': token,
385 381 })
386 382 self.assertEqual(response.status, '200 OK')
387 383 response.mustcontain('Invalid password reset token')
388 384
389 385 # GOOD TOKEN
390 386
391 387 # TODO: The token should ideally be taken from the mail sent
392 388 # above, instead of being recalculated.
393 389
394 390 token = UserModel().get_reset_password_token(
395 391 User.get_by_username(username), timestamp, self.authentication_token())
396 392
397 393 response = self.app.get(url(controller='login',
398 394 action='password_reset_confirmation',
399 395 email=email,
400 396 timestamp=timestamp,
401 397 token=token))
402 398 self.assertEqual(response.status, '200 OK')
403 399 response.mustcontain("You are about to set a new password for the email address %s" % email)
404 400
405 401 response = self.app.post(url(controller='login',
406 402 action='password_reset_confirmation'),
407 403 {'email': email,
408 404 'timestamp': timestamp,
409 405 'password': "p@ssw0rd",
410 406 'password_confirm': "p@ssw0rd",
411 407 'token': token,
412 408 })
413 409 self.assertEqual(response.status, '302 Found')
414 410 self.checkSessionFlash(response, 'Successfully updated password')
415 411
416 412 response = response.follow()
417 413
418 414 #==========================================================================
419 415 # API
420 416 #==========================================================================
421 417
422 418 def _get_api_whitelist(self, values=None):
423 419 config = {'api_access_controllers_whitelist': values or []}
424 420 return config
425 421
426 422 @parameterized.expand([
427 423 ('none', None),
428 424 ('empty_string', ''),
429 425 ('fake_number', '123456'),
430 426 ('proper_api_key', None)
431 427 ])
432 428 def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key):
433 429 whitelist = self._get_api_whitelist([])
434 430 with mock.patch('kallithea.CONFIG', whitelist):
435 431 self.assertEqual([],
436 432 whitelist['api_access_controllers_whitelist'])
437 433 if test_name == 'proper_api_key':
438 434 #use builtin if api_key is None
439 435 api_key = User.get_first_admin().api_key
440 436
441 437 with fixture.anon_access(False):
442 438 self.app.get(url(controller='changeset',
443 439 action='changeset_raw',
444 440 repo_name=HG_REPO, revision='tip', api_key=api_key),
445 441 status=403)
446 442
447 443 @parameterized.expand([
448 444 ('none', None, 302),
449 445 ('empty_string', '', 302),
450 446 ('fake_number', '123456', 302),
451 447 ('fake_not_alnum', 'a-z', 302),
452 448 ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 302),
453 449 ('proper_api_key', None, 200)
454 450 ])
455 451 def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
456 452 whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
457 453 with mock.patch('kallithea.CONFIG', whitelist):
458 454 self.assertEqual(['ChangesetController:changeset_raw'],
459 455 whitelist['api_access_controllers_whitelist'])
460 456 if test_name == 'proper_api_key':
461 457 api_key = User.get_first_admin().api_key
462 458
463 459 with fixture.anon_access(False):
464 460 self.app.get(url(controller='changeset',
465 461 action='changeset_raw',
466 462 repo_name=HG_REPO, revision='tip', api_key=api_key),
467 463 status=code)
468 464
469 465 def test_access_page_via_extra_api_key(self):
470 466 whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
471 467 with mock.patch('kallithea.CONFIG', whitelist):
472 468 self.assertEqual(['ChangesetController:changeset_raw'],
473 469 whitelist['api_access_controllers_whitelist'])
474 470
475 471 new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
476 472 Session().commit()
477 473 with fixture.anon_access(False):
478 474 self.app.get(url(controller='changeset',
479 475 action='changeset_raw',
480 476 repo_name=HG_REPO, revision='tip', api_key=new_api_key.api_key),
481 477 status=200)
482 478
483 479 def test_access_page_via_expired_api_key(self):
484 480 whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
485 481 with mock.patch('kallithea.CONFIG', whitelist):
486 482 self.assertEqual(['ChangesetController:changeset_raw'],
487 483 whitelist['api_access_controllers_whitelist'])
488 484
489 485 new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
490 486 Session().commit()
491 487 #patch the API key and make it expired
492 488 new_api_key.expires = 0
493 489 Session().add(new_api_key)
494 490 Session().commit()
495 491 with fixture.anon_access(False):
496 492 self.app.get(url(controller='changeset',
497 493 action='changeset_raw',
498 494 repo_name=HG_REPO, revision='tip',
499 495 api_key=new_api_key.api_key),
500 496 status=302)
General Comments 0
You need to be logged in to leave comments. Login now