##// END OF EJS Templates
codingstyle: trivial whitespace fixes...
codingstyle: trivial whitespace fixes Reported by flake8.

File last commit:

r6789:76912908 default
r6789:76912908 default
Show More
test_login.py
516 lines | 23.4 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
import re
import time
import urlparse
import mock
from kallithea.tests.base import *
from kallithea.tests.fixture import Fixture
from kallithea.lib.utils2 import generate_api_key
from kallithea.lib.auth import check_password
from kallithea.lib import helpers as h
from kallithea.model.api_key import ApiKeyModel
from kallithea.model import validators
from kallithea.model.db import User, Notification
from kallithea.model.meta import Session
from kallithea.model.user import UserModel
from tg.util.webtest import test_context
fixture = Fixture()
class TestLoginController(TestController):
def setup_method(self, method):
self.remove_all_notifications()
assert Notification.query().all() == []
def test_index(self):
response = self.app.get(url(controller='login', action='index'))
assert response.status == '200 OK'
# Test response...
def test_login_admin_ok(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_ADMIN_LOGIN,
'password': TEST_USER_ADMIN_PASS})
assert response.status == '302 Found'
self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
response = response.follow()
response.mustcontain('/%s' % HG_REPO)
def test_login_regular_ok(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_REGULAR_LOGIN,
'password': TEST_USER_REGULAR_PASS})
assert response.status == '302 Found'
self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
response = response.follow()
response.mustcontain('/%s' % HG_REPO)
def test_login_regular_email_ok(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_REGULAR_EMAIL,
'password': TEST_USER_REGULAR_PASS})
assert response.status == '302 Found'
self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
response = response.follow()
response.mustcontain('/%s' % HG_REPO)
def test_login_ok_came_from(self):
test_came_from = '/_admin/users'
response = self.app.post(url(controller='login', action='index',
came_from=test_came_from),
{'username': TEST_USER_ADMIN_LOGIN,
'password': TEST_USER_ADMIN_PASS})
assert response.status == '302 Found'
response = response.follow()
assert response.status == '200 OK'
response.mustcontain('Users Administration')
def test_login_do_not_remember(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_REGULAR_LOGIN,
'password': TEST_USER_REGULAR_PASS,
'remember': False})
assert 'Set-Cookie' in response.headers
for cookie in response.headers.getall('Set-Cookie'):
assert not re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r has expiration date, but should be a session cookie' % cookie
def test_login_remember(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_REGULAR_LOGIN,
'password': TEST_USER_REGULAR_PASS,
'remember': True})
assert 'Set-Cookie' in response.headers
for cookie in response.headers.getall('Set-Cookie'):
assert re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r should have expiration date, but is a session cookie' % cookie
def test_logout(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_REGULAR_LOGIN,
'password': TEST_USER_REGULAR_PASS})
# Verify that a login session has been established.
response = self.app.get(url(controller='login', action='index'))
response = response.follow()
assert 'authuser' in response.session
response.click('Log Out')
# Verify that the login session has been terminated.
response = self.app.get(url(controller='login', action='index'))
assert 'authuser' not in response.session
@parametrize('url_came_from', [
('data:text/html,<script>window.alert("xss")</script>',),
('mailto:test@example.com',),
('file:///etc/passwd',),
('ftp://ftp.example.com',),
('http://other.example.com/bl%C3%A5b%C3%A6rgr%C3%B8d',),
('//evil.example.com/',),
('/\r\nX-Header-Injection: boo',),
('/invälid_url_bytes',),
('non-absolute-path',),
])
def test_login_bad_came_froms(self, url_came_from):
response = self.app.post(url(controller='login', action='index',
came_from=url_came_from),
{'username': TEST_USER_ADMIN_LOGIN,
'password': TEST_USER_ADMIN_PASS},
status=400)
def test_login_short_password(self):
response = self.app.post(url(controller='login', action='index'),
{'username': TEST_USER_ADMIN_LOGIN,
'password': 'as'})
assert response.status == '200 OK'
response.mustcontain('Enter 3 characters or more')
def test_login_wrong_username_password(self):
response = self.app.post(url(controller='login', action='index'),
{'username': 'error',
'password': 'test12'})
response.mustcontain('Invalid username or password')
# verify that get arguments are correctly passed along login redirection
@parametrize('args,args_encoded', [
({'foo':'one', 'bar':'two'}, (('foo', 'one'), ('bar', 'two'))),
({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
(('blue', u'blå'.encode('utf-8')), ('green', u'grøn'.encode('utf-8')))),
])
def test_redirection_to_login_form_preserves_get_args(self, args, args_encoded):
with fixture.anon_access(False):
response = self.app.get(url(controller='summary', action='index',
repo_name=HG_REPO,
**args))
assert response.status == '302 Found'
came_from = urlparse.parse_qs(urlparse.urlparse(response.location).query)['came_from'][0]
came_from_qs = urlparse.parse_qsl(urlparse.urlparse(came_from).query)
for encoded in args_encoded:
assert encoded in came_from_qs
@parametrize('args,args_encoded', [
({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
({'blue': u'blå', 'green':u'grøn'},
('blue=bl%C3%A5', 'green=gr%C3%B8n')),
])
def test_login_form_preserves_get_args(self, args, args_encoded):
response = self.app.get(url(controller='login', action='index',
came_from=url('/_admin/users', **args)))
came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
for encoded in args_encoded:
assert encoded in came_from
@parametrize('args,args_encoded', [
({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
({'blue': u'blå', 'green':u'grøn'},
('blue=bl%C3%A5', 'green=gr%C3%B8n')),
])
def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
response = self.app.post(url(controller='login', action='index',
came_from=url('/_admin/users', **args)),
{'username': TEST_USER_ADMIN_LOGIN,
'password': TEST_USER_ADMIN_PASS})
assert response.status == '302 Found'
for encoded in args_encoded:
assert encoded in response.location
@parametrize('args,args_encoded', [
({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
({'blue': u'blå', 'green':u'grøn'},
('blue=bl%C3%A5', 'green=gr%C3%B8n')),
])
def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
response = self.app.post(url(controller='login', action='index',
came_from=url('/_admin/users', **args)),
{'username': 'error',
'password': 'test12'})
response.mustcontain('Invalid username or password')
came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
for encoded in args_encoded:
assert encoded in came_from
#==========================================================================
# REGISTRATIONS
#==========================================================================
def test_register(self):
response = self.app.get(url(controller='login', action='register'))
response.mustcontain('Sign Up')
def test_register_err_same_username(self):
uname = TEST_USER_ADMIN_LOGIN
response = self.app.post(url(controller='login', action='register'),
{'username': uname,
'password': 'test12',
'password_confirmation': 'test12',
'email': 'goodmail@example.com',
'firstname': 'test',
'lastname': 'test'})
with test_context(self.app):
msg = validators.ValidUsername()._messages['username_exists']
msg = h.html_escape(msg % {'username': uname})
response.mustcontain(msg)
def test_register_err_same_email(self):
response = self.app.post(url(controller='login', action='register'),
{'username': 'test_admin_0',
'password': 'test12',
'password_confirmation': 'test12',
'email': TEST_USER_ADMIN_EMAIL,
'firstname': 'test',
'lastname': 'test'})
with test_context(self.app):
msg = validators.UniqSystemEmail()()._messages['email_taken']
response.mustcontain(msg)
def test_register_err_same_email_case_sensitive(self):
response = self.app.post(url(controller='login', action='register'),
{'username': 'test_admin_1',
'password': 'test12',
'password_confirmation': 'test12',
'email': TEST_USER_ADMIN_EMAIL.title(),
'firstname': 'test',
'lastname': 'test'})
with test_context(self.app):
msg = validators.UniqSystemEmail()()._messages['email_taken']
response.mustcontain(msg)
def test_register_err_wrong_data(self):
response = self.app.post(url(controller='login', action='register'),
{'username': 'xs',
'password': 'test',
'password_confirmation': 'test',
'email': 'goodmailm',
'firstname': 'test',
'lastname': 'test'})
assert response.status == '200 OK'
response.mustcontain('An email address must contain a single @')
response.mustcontain('Enter a value 6 characters long or more')
def test_register_err_username(self):
response = self.app.post(url(controller='login', action='register'),
{'username': 'error user',
'password': 'test12',
'password_confirmation': 'test12',
'email': 'goodmailm',
'firstname': 'test',
'lastname': 'test'})
response.mustcontain('An email address must contain a single @')
response.mustcontain('Username may only contain '
'alphanumeric characters underscores, '
'periods or dashes and must begin with an '
'alphanumeric character')
def test_register_err_case_sensitive(self):
usr = TEST_USER_ADMIN_LOGIN.title()
response = self.app.post(url(controller='login', action='register'),
{'username': usr,
'password': 'test12',
'password_confirmation': 'test12',
'email': 'goodmailm',
'firstname': 'test',
'lastname': 'test'})
response.mustcontain('An email address must contain a single @')
with test_context(self.app):
msg = validators.ValidUsername()._messages['username_exists']
msg = h.html_escape(msg % {'username': usr})
response.mustcontain(msg)
def test_register_special_chars(self):
response = self.app.post(url(controller='login', action='register'),
{'username': 'xxxaxn',
'password': 'ąćźżąśśśś',
'password_confirmation': 'ąćźżąśśśś',
'email': 'goodmailm@test.plx',
'firstname': 'test',
'lastname': 'test'})
with test_context(self.app):
msg = validators.ValidPassword()._messages['invalid_password']
response.mustcontain(msg)
def test_register_password_mismatch(self):
response = self.app.post(url(controller='login', action='register'),
{'username': 'xs',
'password': '123qwe',
'password_confirmation': 'qwe123',
'email': 'goodmailm@test.plxa',
'firstname': 'test',
'lastname': 'test'})
with test_context(self.app):
msg = validators.ValidPasswordsMatch('password', 'password_confirmation')._messages['password_mismatch']
response.mustcontain(msg)
def test_register_ok(self):
username = 'test_regular4'
password = 'qweqwe'
email = 'user4@example.com'
name = 'testname'
lastname = 'testlastname'
response = self.app.post(url(controller='login', action='register'),
{'username': username,
'password': password,
'password_confirmation': password,
'email': email,
'firstname': name,
'lastname': lastname,
'admin': True}) # This should be overridden
assert response.status == '302 Found'
self.checkSessionFlash(response, 'You have successfully registered with Kallithea')
ret = Session().query(User).filter(User.username == 'test_regular4').one()
assert ret.username == username
assert check_password(password, ret.password) == True
assert ret.email == email
assert ret.name == name
assert ret.lastname == lastname
assert ret.api_key != None
assert ret.admin == False
#==========================================================================
# PASSWORD RESET
#==========================================================================
def test_forgot_password_wrong_mail(self):
bad_email = 'username%wrongmail.org'
response = self.app.post(
url(controller='login', action='password_reset'),
{'email': bad_email, }
)
response.mustcontain('An email address must contain a single @')
def test_forgot_password(self):
response = self.app.get(url(controller='login',
action='password_reset'))
assert response.status == '200 OK'
username = 'test_password_reset_1'
password = 'qweqwe'
email = 'username@example.com'
name = u'passwd'
lastname = u'reset'
timestamp = int(time.time())
new = User()
new.username = username
new.password = password
new.email = email
new.name = name
new.lastname = lastname
new.api_key = generate_api_key()
Session().add(new)
Session().commit()
response = self.app.post(url(controller='login',
action='password_reset'),
{'email': email, })
self.checkSessionFlash(response, 'A password reset confirmation code has been sent')
response = response.follow()
# BAD TOKEN
token = "bad"
response = self.app.post(url(controller='login',
action='password_reset_confirmation'),
{'email': email,
'timestamp': timestamp,
'password': "p@ssw0rd",
'password_confirm': "p@ssw0rd",
'token': token,
})
assert response.status == '200 OK'
response.mustcontain('Invalid password reset token')
# GOOD TOKEN
# TODO: The token should ideally be taken from the mail sent
# above, instead of being recalculated.
token = UserModel().get_reset_password_token(
User.get_by_username(username), timestamp, self.authentication_token())
response = self.app.get(url(controller='login',
action='password_reset_confirmation',
email=email,
timestamp=timestamp,
token=token))
assert response.status == '200 OK'
response.mustcontain("You are about to set a new password for the email address %s" % email)
response = self.app.post(url(controller='login',
action='password_reset_confirmation'),
{'email': email,
'timestamp': timestamp,
'password': "p@ssw0rd",
'password_confirm': "p@ssw0rd",
'token': token,
})
assert response.status == '302 Found'
self.checkSessionFlash(response, 'Successfully updated password')
response = response.follow()
#==========================================================================
# API
#==========================================================================
def _get_api_whitelist(self, values=None):
config = {'api_access_controllers_whitelist': values or []}
return config
def _api_key_test(self, api_key, status):
"""Verifies HTTP status code for accessing an auth-requiring page,
using the given api_key URL parameter as well as using the API key
with bearer authentication.
If api_key is None, no api_key is passed at all. If api_key is True,
a real, working API key is used.
"""
with fixture.anon_access(False):
if api_key is None:
params = {}
headers = {}
else:
if api_key is True:
api_key = User.get_first_admin().api_key
params = {'api_key': api_key}
headers = {'Authorization': 'Bearer ' + str(api_key)}
self.app.get(url(controller='changeset', action='changeset_raw',
repo_name=HG_REPO, revision='tip', **params),
status=status)
self.app.get(url(controller='changeset', action='changeset_raw',
repo_name=HG_REPO, revision='tip'),
headers=headers,
status=status)
@parametrize('test_name,api_key,code', [
('none', None, 302),
('empty_string', '', 403),
('fake_number', '123456', 403),
('proper_api_key', True, 403)
])
def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key, code):
whitelist = self._get_api_whitelist([])
with mock.patch('kallithea.CONFIG', whitelist):
assert [] == whitelist['api_access_controllers_whitelist']
self._api_key_test(api_key, code)
@parametrize('test_name,api_key,code', [
('none', None, 302),
('empty_string', '', 403),
('fake_number', '123456', 403),
('fake_not_alnum', 'a-z', 403),
('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
('proper_api_key', True, 200)
])
def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
with mock.patch('kallithea.CONFIG', whitelist):
assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
self._api_key_test(api_key, code)
def test_access_page_via_extra_api_key(self):
whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
with mock.patch('kallithea.CONFIG', whitelist):
assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
Session().commit()
self._api_key_test(new_api_key.api_key, status=200)
def test_access_page_via_expired_api_key(self):
whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
with mock.patch('kallithea.CONFIG', whitelist):
assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
Session().commit()
# patch the API key and make it expired
new_api_key.expires = 0
Session().commit()
self._api_key_test(new_api_key.api_key, status=403)