##// END OF EJS Templates
pytest: Fix the user login function....
johbo -
r41:6cf4cae4 default
parent child Browse files
Show More
@@ -1,254 +1,259 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import datetime
25 25 import hashlib
26 26 import tempfile
27 27 from os.path import join as jn
28 28
29 29 from tempfile import _RandomNameSequence
30 30
31 31 from paste.deploy import loadapp
32 32 from paste.script.appinstall import SetupCommand
33 33
34 34 import pylons
35 35 import pylons.test
36 36 from pylons import config, url
37 37 from pylons.i18n.translation import _get_translator
38 38 from pylons.util import ContextObj
39 39
40 40 from routes.util import URLGenerator
41 41 from webtest import TestApp
42 42 from nose.plugins.skip import SkipTest
43 43 import pytest
44 44
45 45 from rhodecode import is_windows
46 46 from rhodecode.config.routing import ADMIN_PREFIX
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.db import User
49 49 from rhodecode.lib import auth
50 50 from rhodecode.lib.helpers import flash, link_to
51 51 from rhodecode.lib.utils2 import safe_unicode, safe_str
52 from rhodecode.tests.utils import get_session_from_response
52 53
53 54 # TODO: johbo: Solve time zone related issues and remove this tweak
54 55 os.environ['TZ'] = 'UTC'
55 56 if not is_windows:
56 57 time.tzset()
57 58
58 59 log = logging.getLogger(__name__)
59 60
60 61 __all__ = [
61 62 'get_new_dir', 'TestController', 'SkipTest',
62 63 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
63 64 'assert_session_flash', 'login_user',
64 65 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
65 66 'NEW_HG_REPO', 'NEW_GIT_REPO',
66 67 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
67 68 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
68 69 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
69 70 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
70 71 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
71 72 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
72 73 ]
73 74
74 75 # Invoke websetup with the current config file
75 76 # SetupCommand('setup-app').run([config_file])
76 77
77 78 # SOME GLOBALS FOR TESTS
78 79 TEST_DIR = tempfile.gettempdir()
79 80
80 81 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
81 82 TEST_USER_ADMIN_LOGIN = 'test_admin'
82 83 TEST_USER_ADMIN_PASS = 'test12'
83 84 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
84 85
85 86 TEST_USER_REGULAR_LOGIN = 'test_regular'
86 87 TEST_USER_REGULAR_PASS = 'test12'
87 88 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
88 89
89 90 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
90 91 TEST_USER_REGULAR2_PASS = 'test12'
91 92 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
92 93
93 94 HG_REPO = 'vcs_test_hg'
94 95 GIT_REPO = 'vcs_test_git'
95 96 SVN_REPO = 'vcs_test_svn'
96 97
97 98 NEW_HG_REPO = 'vcs_test_hg_new'
98 99 NEW_GIT_REPO = 'vcs_test_git_new'
99 100
100 101 HG_FORK = 'vcs_test_hg_fork'
101 102 GIT_FORK = 'vcs_test_git_fork'
102 103
103 104 ## VCS
104 105 SCM_TESTS = ['hg', 'git']
105 106 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
106 107
107 108 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
108 109 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
109 110 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
110 111
111 112 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
112 113 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
113 114 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
114 115
115 116 TEST_REPO_PREFIX = 'vcs-test'
116 117
117 118
118 119 # skip ldap tests if LDAP lib is not installed
119 120 ldap_lib_installed = False
120 121 try:
121 122 import ldap
122 123 ldap_lib_installed = True
123 124 except ImportError:
124 125 # means that python-ldap is not installed
125 126 pass
126 127
127 128
128 129 def clear_all_caches():
129 130 from beaker.cache import cache_managers
130 131 for _cache in cache_managers.values():
131 132 _cache.clear()
132 133
133 134
134 135 def get_new_dir(title):
135 136 """
136 137 Returns always new directory path.
137 138 """
138 139 from rhodecode.tests.vcs.utils import get_normalized_path
139 140 name_parts = [TEST_REPO_PREFIX]
140 141 if title:
141 142 name_parts.append(title)
142 143 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
143 144 name_parts.append(hex_str)
144 145 name = '-'.join(name_parts)
145 146 path = os.path.join(TEST_DIR, name)
146 147 return get_normalized_path(path)
147 148
148 149
149 150 @pytest.mark.usefixtures('app', 'index_location')
150 151 class TestController(object):
151 152
152 153 maxDiff = None
153 154
154 155 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
155 156 password=TEST_USER_ADMIN_PASS):
156 157 self._logged_username = username
157 158 self._session = login_user_session(self.app, username, password)
158 159 self.csrf_token = auth.get_csrf_token(self._session)
159 160
160 161 return self._session['rhodecode_user']
161 162
162 163 def logout_user(self):
163 164 logout_user_session(self.app, auth.get_csrf_token(self._session))
164 165 self.csrf_token = None
165 166 self._logged_username = None
166 167 self._session = None
167 168
168 169 def _get_logged_user(self):
169 170 return User.get_by_username(self._logged_username)
170 171
171 172 # TODO: remove, use plain assert in tests
172 173 def assertEqual(self, a, b, msg=None):
173 174 if msg:
174 175 assert a == b, msg
175 176 else:
176 177 assert a == b
177 178
178 179
179 180 def login_user_session(
180 181 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
181 182 from rhodecode.tests.functional.test_login import login_url
182 183 response = app.post(
183 184 login_url,
184 185 {'username': username, 'password': password})
185 186 if 'invalid user name' in response.body:
186 187 pytest.fail('could not login using %s %s' % (username, password))
187 188
188 189 assert response.status == '302 Found'
189 ses = response.session['rhodecode_user']
190 assert ses.get('username') == username
191 190 response = response.follow()
192 assert ses.get('is_authenticated')
191 assert response.status == '200 OK'
193 192
194 return response.session
193 session = get_session_from_response(response)
194 assert 'rhodecode_user' in session
195 rc_user = session['rhodecode_user']
196 assert rc_user.get('username') == username
197 assert rc_user.get('is_authenticated')
198
199 return session
195 200
196 201
197 202 def logout_user_session(app, csrf_token):
198 203 from rhodecode.tests.functional.test_login import logut_url
199 204 app.post(logut_url, {'csrf_token': csrf_token}, status=302)
200 205
201 206
202 207 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
203 208 password=TEST_USER_ADMIN_PASS):
204 209 return login_user_session(app, username, password)['rhodecode_user']
205 210
206 211
207 212 def assert_session_flash(response=None, msg=None, category=None):
208 213 """
209 214 Assert on a flash message in the current session.
210 215
211 216 :param msg: Required. The expected message. Will be evaluated if a
212 217 :class:`LazyString` is passed in.
213 218 :param response: Optional. For functional testing, pass in the response
214 219 object. Otherwise don't pass in any value.
215 220 :param category: Optional. If passed, the message category will be
216 221 checked as well.
217 222 """
218 223 if msg is None:
219 224 raise ValueError("Parameter msg is required.")
220 225
221 226 messages = flash.pop_messages()
222 227 message = messages[0]
223 228
224 229 msg = _eval_if_lazy(msg)
225 230 message_text = _eval_if_lazy(message.message)
226 231
227 232 if msg not in message_text:
228 233 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
229 234 msg, message_text)
230 235 pytest.fail(safe_str(msg))
231 236 if category:
232 237 assert category == message.category
233 238
234 239
235 240 def _eval_if_lazy(value):
236 241 return value.eval() if hasattr(value, 'eval') else value
237 242
238 243
239 244 def assert_not_in_session_flash(response, msg, category=None):
240 245 assert 'flash' in response.session, 'Response session has no flash key'
241 246 message_category, message_text = response.session['flash'][0]
242 247 if msg in message_text:
243 248 msg = u'msg `%s` found in session flash: got `%s` instead' % (
244 249 msg, message_text)
245 250 pytest.fail(safe_str(msg))
246 251 if category:
247 252 assert category == message_category
248 253
249 254
250 255 def assert_session_flash_is_empty(response):
251 256 if 'flash' in response.session:
252 257 msg = 'flash messages are present in session:%s' % \
253 258 response.session['flash'][0]
254 259 pytest.fail(safe_str(msg))
@@ -1,272 +1,282 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess
26 26 import urllib2
27 27 from urlparse import urlparse, parse_qsl
28 28 from urllib import unquote_plus
29 29
30 30 import pytest
31 31 import rc_testdata
32 32 from lxml.html import fromstring, tostring
33 33 from lxml.cssselect import CSSSelector
34 34
35 35 from rhodecode.model.db import User
36 36 from rhodecode.model.meta import Session
37 37 from rhodecode.model.scm import ScmModel
38 38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39 39
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 def set_anonymous_access(enabled):
45 45 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 46 user = User.get_default_user()
47 47 user.active = enabled
48 48 Session().add(user)
49 49 Session().commit()
50 50 log.info('anonymous access is now: %s', enabled)
51 51 assert enabled == User.get_default_user().active, (
52 52 'Cannot set anonymous access')
53 53
54 54
55 55 def check_xfail_backends(node, backend_alias):
56 56 # Using "xfail_backends" here intentionally, since this marks work
57 57 # which is "to be done" soon.
58 58 skip_marker = node.get_marker('xfail_backends')
59 59 if skip_marker and backend_alias in skip_marker.args:
60 60 msg = "Support for backend %s to be developed." % (backend_alias, )
61 61 msg = skip_marker.kwargs.get('reason', msg)
62 62 pytest.xfail(msg)
63 63
64 64
65 65 def check_skip_backends(node, backend_alias):
66 66 # Using "skip_backends" here intentionally, since this marks work which is
67 67 # not supported.
68 68 skip_marker = node.get_marker('skip_backends')
69 69 if skip_marker and backend_alias in skip_marker.args:
70 70 msg = "Feature not supported for backend %s." % (backend_alias, )
71 71 msg = skip_marker.kwargs.get('reason', msg)
72 72 pytest.skip(msg)
73 73
74 74
75 75 def extract_git_repo_from_dump(dump_name, repo_name):
76 76 """Create git repo `repo_name` from dump `dump_name`."""
77 77 repos_path = ScmModel().repos_path
78 78 target_path = os.path.join(repos_path, repo_name)
79 79 rc_testdata.extract_git_dump(dump_name, target_path)
80 80 return target_path
81 81
82 82
83 83 def extract_hg_repo_from_dump(dump_name, repo_name):
84 84 """Create hg repo `repo_name` from dump `dump_name`."""
85 85 repos_path = ScmModel().repos_path
86 86 target_path = os.path.join(repos_path, repo_name)
87 87 rc_testdata.extract_hg_dump(dump_name, target_path)
88 88 return target_path
89 89
90 90
91 91 def extract_svn_repo_from_dump(dump_name, repo_name):
92 92 """Create a svn repo `repo_name` from dump `dump_name`."""
93 93 repos_path = ScmModel().repos_path
94 94 target_path = os.path.join(repos_path, repo_name)
95 95 SubversionRepository(target_path, create=True)
96 96 _load_svn_dump_into_repo(dump_name, target_path)
97 97 return target_path
98 98
99 99
100 100 def assert_message_in_log(log_records, message, levelno, module):
101 101 messages = [
102 102 r.message for r in log_records
103 103 if r.module == module and r.levelno == levelno
104 104 ]
105 105 assert message in messages
106 106
107 107
108 108 def _load_svn_dump_into_repo(dump_name, repo_path):
109 109 """
110 110 Utility to populate a svn repository with a named dump
111 111
112 112 Currently the dumps are in rc_testdata. They might later on be
113 113 integrated with the main repository once they stabilize more.
114 114 """
115 115 dump = rc_testdata.load_svn_dump(dump_name)
116 116 load_dump = subprocess.Popen(
117 117 ['svnadmin', 'load', repo_path],
118 118 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
119 119 stderr=subprocess.PIPE)
120 120 out, err = load_dump.communicate(dump)
121 121 if load_dump.returncode != 0:
122 122 log.error("Output of load_dump command: %s", out)
123 123 log.error("Error output of load_dump command: %s", err)
124 124 raise Exception(
125 125 'Failed to load dump "%s" into repository at path "%s".'
126 126 % (dump_name, repo_path))
127 127
128 128
129 129 class AssertResponse(object):
130 130 """
131 131 Utility that helps to assert things about a given HTML response.
132 132 """
133 133
134 134 def __init__(self, response):
135 135 self.response = response
136 136
137 137 def one_element_exists(self, css_selector):
138 138 self.get_element(css_selector)
139 139
140 140 def no_element_exists(self, css_selector):
141 141 assert not self._get_elements(css_selector)
142 142
143 143 def element_equals_to(self, css_selector, expected_content):
144 144 element = self.get_element(css_selector)
145 145 element_text = self._element_to_string(element)
146 146 assert expected_content in element_text
147 147
148 148 def element_contains(self, css_selector, expected_content):
149 149 element = self.get_element(css_selector)
150 150 assert expected_content in element.text_content()
151 151
152 152 def contains_one_link(self, link_text, href):
153 153 doc = fromstring(self.response.body)
154 154 sel = CSSSelector('a[href]')
155 155 elements = [
156 156 e for e in sel(doc) if e.text_content().strip() == link_text]
157 157 assert len(elements) == 1, "Did not find link or found multiple links"
158 158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
159 159
160 160 def contains_one_anchor(self, anchor_id):
161 161 doc = fromstring(self.response.body)
162 162 sel = CSSSelector('#' + anchor_id)
163 163 elements = sel(doc)
164 164 assert len(elements) == 1
165 165
166 166 def _ensure_url_equal(self, found, expected):
167 167 assert _Url(found) == _Url(expected)
168 168
169 169 def get_element(self, css_selector):
170 170 elements = self._get_elements(css_selector)
171 171 assert len(elements) == 1
172 172 return elements[0]
173 173
174 174 def _get_elements(self, css_selector):
175 175 doc = fromstring(self.response.body)
176 176 sel = CSSSelector(css_selector)
177 177 elements = sel(doc)
178 178 return elements
179 179
180 180 def _element_to_string(self, element):
181 181 return tostring(element)
182 182
183 183
184 184 class _Url(object):
185 185 """
186 186 A url object that can be compared with other url orbjects
187 187 without regard to the vagaries of encoding, escaping, and ordering
188 188 of parameters in query strings.
189 189
190 190 Inspired by
191 191 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
192 192 """
193 193
194 194 def __init__(self, url):
195 195 parts = urlparse(url)
196 196 _query = frozenset(parse_qsl(parts.query))
197 197 _path = unquote_plus(parts.path)
198 198 parts = parts._replace(query=_query, path=_path)
199 199 self.parts = parts
200 200
201 201 def __eq__(self, other):
202 202 return self.parts == other.parts
203 203
204 204 def __hash__(self):
205 205 return hash(self.parts)
206 206
207 207
208 208 def run_test_concurrently(times, raise_catched_exc=True):
209 209 """
210 210 Add this decorator to small pieces of code that you want to test
211 211 concurrently
212 212
213 213 ex:
214 214
215 215 @test_concurrently(25)
216 216 def my_test_function():
217 217 ...
218 218 """
219 219 def test_concurrently_decorator(test_func):
220 220 def wrapper(*args, **kwargs):
221 221 exceptions = []
222 222
223 223 def call_test_func():
224 224 try:
225 225 test_func(*args, **kwargs)
226 226 except Exception, e:
227 227 exceptions.append(e)
228 228 if raise_catched_exc:
229 229 raise
230 230 threads = []
231 231 for i in range(times):
232 232 threads.append(threading.Thread(target=call_test_func))
233 233 for t in threads:
234 234 t.start()
235 235 for t in threads:
236 236 t.join()
237 237 if exceptions:
238 238 raise Exception(
239 239 'test_concurrently intercepted %s exceptions: %s' % (
240 240 len(exceptions), exceptions))
241 241 return wrapper
242 242 return test_concurrently_decorator
243 243
244 244
245 245 def wait_for_url(url, timeout=10):
246 246 """
247 247 Wait until URL becomes reachable.
248 248
249 249 It polls the URL until the timeout is reached or it became reachable.
250 250 If will call to `py.test.fail` in case the URL is not reachable.
251 251 """
252 252 timeout = time.time() + timeout
253 253 last = 0
254 254 wait = 0.1
255 255
256 256 while (timeout > last):
257 257 last = time.time()
258 258 if is_url_reachable(url):
259 259 break
260 260 elif ((last + wait) > time.time()):
261 261 # Go to sleep because not enough time has passed since last check.
262 262 time.sleep(wait)
263 263 else:
264 264 pytest.fail("Timeout while waiting for URL {}".format(url))
265 265
266 266
267 267 def is_url_reachable(url):
268 268 try:
269 269 urllib2.urlopen(url)
270 270 except urllib2.URLError:
271 271 return False
272 272 return True
273
274
275 def get_session_from_response(response):
276 """
277 This returns the session from a response object. Pylons has some magic
278 to make the session available as `response.session`. But pyramid
279 doesn't expose it.
280 """
281 # TODO: Try to look up the session key also.
282 return response.request.environ['beaker.session']
General Comments 0
You need to be logged in to leave comments. Login now