##// END OF EJS Templates
pytest: Fix the user login function....
johbo -
r41:6cf4cae4 default
parent child Browse files
Show More
@@ -1,254 +1,259 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import datetime
24 import datetime
25 import hashlib
25 import hashlib
26 import tempfile
26 import tempfile
27 from os.path import join as jn
27 from os.path import join as jn
28
28
29 from tempfile import _RandomNameSequence
29 from tempfile import _RandomNameSequence
30
30
31 from paste.deploy import loadapp
31 from paste.deploy import loadapp
32 from paste.script.appinstall import SetupCommand
32 from paste.script.appinstall import SetupCommand
33
33
34 import pylons
34 import pylons
35 import pylons.test
35 import pylons.test
36 from pylons import config, url
36 from pylons import config, url
37 from pylons.i18n.translation import _get_translator
37 from pylons.i18n.translation import _get_translator
38 from pylons.util import ContextObj
38 from pylons.util import ContextObj
39
39
40 from routes.util import URLGenerator
40 from routes.util import URLGenerator
41 from webtest import TestApp
41 from webtest import TestApp
42 from nose.plugins.skip import SkipTest
42 from nose.plugins.skip import SkipTest
43 import pytest
43 import pytest
44
44
45 from rhodecode import is_windows
45 from rhodecode import is_windows
46 from rhodecode.config.routing import ADMIN_PREFIX
46 from rhodecode.config.routing import ADMIN_PREFIX
47 from rhodecode.model.meta import Session
47 from rhodecode.model.meta import Session
48 from rhodecode.model.db import User
48 from rhodecode.model.db import User
49 from rhodecode.lib import auth
49 from rhodecode.lib import auth
50 from rhodecode.lib.helpers import flash, link_to
50 from rhodecode.lib.helpers import flash, link_to
51 from rhodecode.lib.utils2 import safe_unicode, safe_str
51 from rhodecode.lib.utils2 import safe_unicode, safe_str
52 from rhodecode.tests.utils import get_session_from_response
52
53
53 # TODO: johbo: Solve time zone related issues and remove this tweak
54 # TODO: johbo: Solve time zone related issues and remove this tweak
54 os.environ['TZ'] = 'UTC'
55 os.environ['TZ'] = 'UTC'
55 if not is_windows:
56 if not is_windows:
56 time.tzset()
57 time.tzset()
57
58
58 log = logging.getLogger(__name__)
59 log = logging.getLogger(__name__)
59
60
60 __all__ = [
61 __all__ = [
61 'get_new_dir', 'TestController', 'SkipTest',
62 'get_new_dir', 'TestController', 'SkipTest',
62 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
63 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
63 'assert_session_flash', 'login_user',
64 'assert_session_flash', 'login_user',
64 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
65 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
65 'NEW_HG_REPO', 'NEW_GIT_REPO',
66 'NEW_HG_REPO', 'NEW_GIT_REPO',
66 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
67 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
67 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
68 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
68 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
69 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
69 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
70 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
70 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
71 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
71 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
72 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
72 ]
73 ]
73
74
74 # Invoke websetup with the current config file
75 # Invoke websetup with the current config file
75 # SetupCommand('setup-app').run([config_file])
76 # SetupCommand('setup-app').run([config_file])
76
77
77 # SOME GLOBALS FOR TESTS
78 # SOME GLOBALS FOR TESTS
78 TEST_DIR = tempfile.gettempdir()
79 TEST_DIR = tempfile.gettempdir()
79
80
80 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
81 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
81 TEST_USER_ADMIN_LOGIN = 'test_admin'
82 TEST_USER_ADMIN_LOGIN = 'test_admin'
82 TEST_USER_ADMIN_PASS = 'test12'
83 TEST_USER_ADMIN_PASS = 'test12'
83 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
84 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
84
85
85 TEST_USER_REGULAR_LOGIN = 'test_regular'
86 TEST_USER_REGULAR_LOGIN = 'test_regular'
86 TEST_USER_REGULAR_PASS = 'test12'
87 TEST_USER_REGULAR_PASS = 'test12'
87 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
88 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
88
89
89 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
90 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
90 TEST_USER_REGULAR2_PASS = 'test12'
91 TEST_USER_REGULAR2_PASS = 'test12'
91 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
92 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
92
93
93 HG_REPO = 'vcs_test_hg'
94 HG_REPO = 'vcs_test_hg'
94 GIT_REPO = 'vcs_test_git'
95 GIT_REPO = 'vcs_test_git'
95 SVN_REPO = 'vcs_test_svn'
96 SVN_REPO = 'vcs_test_svn'
96
97
97 NEW_HG_REPO = 'vcs_test_hg_new'
98 NEW_HG_REPO = 'vcs_test_hg_new'
98 NEW_GIT_REPO = 'vcs_test_git_new'
99 NEW_GIT_REPO = 'vcs_test_git_new'
99
100
100 HG_FORK = 'vcs_test_hg_fork'
101 HG_FORK = 'vcs_test_hg_fork'
101 GIT_FORK = 'vcs_test_git_fork'
102 GIT_FORK = 'vcs_test_git_fork'
102
103
103 ## VCS
104 ## VCS
104 SCM_TESTS = ['hg', 'git']
105 SCM_TESTS = ['hg', 'git']
105 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
106 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
106
107
107 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
108 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
108 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
109 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
109 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
110 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
110
111
111 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
112 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
112 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
113 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
113 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
114 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
114
115
115 TEST_REPO_PREFIX = 'vcs-test'
116 TEST_REPO_PREFIX = 'vcs-test'
116
117
117
118
118 # skip ldap tests if LDAP lib is not installed
119 # skip ldap tests if LDAP lib is not installed
119 ldap_lib_installed = False
120 ldap_lib_installed = False
120 try:
121 try:
121 import ldap
122 import ldap
122 ldap_lib_installed = True
123 ldap_lib_installed = True
123 except ImportError:
124 except ImportError:
124 # means that python-ldap is not installed
125 # means that python-ldap is not installed
125 pass
126 pass
126
127
127
128
128 def clear_all_caches():
129 def clear_all_caches():
129 from beaker.cache import cache_managers
130 from beaker.cache import cache_managers
130 for _cache in cache_managers.values():
131 for _cache in cache_managers.values():
131 _cache.clear()
132 _cache.clear()
132
133
133
134
134 def get_new_dir(title):
135 def get_new_dir(title):
135 """
136 """
136 Returns always new directory path.
137 Returns always new directory path.
137 """
138 """
138 from rhodecode.tests.vcs.utils import get_normalized_path
139 from rhodecode.tests.vcs.utils import get_normalized_path
139 name_parts = [TEST_REPO_PREFIX]
140 name_parts = [TEST_REPO_PREFIX]
140 if title:
141 if title:
141 name_parts.append(title)
142 name_parts.append(title)
142 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
143 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
143 name_parts.append(hex_str)
144 name_parts.append(hex_str)
144 name = '-'.join(name_parts)
145 name = '-'.join(name_parts)
145 path = os.path.join(TEST_DIR, name)
146 path = os.path.join(TEST_DIR, name)
146 return get_normalized_path(path)
147 return get_normalized_path(path)
147
148
148
149
149 @pytest.mark.usefixtures('app', 'index_location')
150 @pytest.mark.usefixtures('app', 'index_location')
150 class TestController(object):
151 class TestController(object):
151
152
152 maxDiff = None
153 maxDiff = None
153
154
154 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
155 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
155 password=TEST_USER_ADMIN_PASS):
156 password=TEST_USER_ADMIN_PASS):
156 self._logged_username = username
157 self._logged_username = username
157 self._session = login_user_session(self.app, username, password)
158 self._session = login_user_session(self.app, username, password)
158 self.csrf_token = auth.get_csrf_token(self._session)
159 self.csrf_token = auth.get_csrf_token(self._session)
159
160
160 return self._session['rhodecode_user']
161 return self._session['rhodecode_user']
161
162
162 def logout_user(self):
163 def logout_user(self):
163 logout_user_session(self.app, auth.get_csrf_token(self._session))
164 logout_user_session(self.app, auth.get_csrf_token(self._session))
164 self.csrf_token = None
165 self.csrf_token = None
165 self._logged_username = None
166 self._logged_username = None
166 self._session = None
167 self._session = None
167
168
168 def _get_logged_user(self):
169 def _get_logged_user(self):
169 return User.get_by_username(self._logged_username)
170 return User.get_by_username(self._logged_username)
170
171
171 # TODO: remove, use plain assert in tests
172 # TODO: remove, use plain assert in tests
172 def assertEqual(self, a, b, msg=None):
173 def assertEqual(self, a, b, msg=None):
173 if msg:
174 if msg:
174 assert a == b, msg
175 assert a == b, msg
175 else:
176 else:
176 assert a == b
177 assert a == b
177
178
178
179
179 def login_user_session(
180 def login_user_session(
180 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
181 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
181 from rhodecode.tests.functional.test_login import login_url
182 from rhodecode.tests.functional.test_login import login_url
182 response = app.post(
183 response = app.post(
183 login_url,
184 login_url,
184 {'username': username, 'password': password})
185 {'username': username, 'password': password})
185 if 'invalid user name' in response.body:
186 if 'invalid user name' in response.body:
186 pytest.fail('could not login using %s %s' % (username, password))
187 pytest.fail('could not login using %s %s' % (username, password))
187
188
188 assert response.status == '302 Found'
189 assert response.status == '302 Found'
189 ses = response.session['rhodecode_user']
190 assert ses.get('username') == username
191 response = response.follow()
190 response = response.follow()
192 assert ses.get('is_authenticated')
191 assert response.status == '200 OK'
193
192
194 return response.session
193 session = get_session_from_response(response)
194 assert 'rhodecode_user' in session
195 rc_user = session['rhodecode_user']
196 assert rc_user.get('username') == username
197 assert rc_user.get('is_authenticated')
198
199 return session
195
200
196
201
197 def logout_user_session(app, csrf_token):
202 def logout_user_session(app, csrf_token):
198 from rhodecode.tests.functional.test_login import logut_url
203 from rhodecode.tests.functional.test_login import logut_url
199 app.post(logut_url, {'csrf_token': csrf_token}, status=302)
204 app.post(logut_url, {'csrf_token': csrf_token}, status=302)
200
205
201
206
202 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
207 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
203 password=TEST_USER_ADMIN_PASS):
208 password=TEST_USER_ADMIN_PASS):
204 return login_user_session(app, username, password)['rhodecode_user']
209 return login_user_session(app, username, password)['rhodecode_user']
205
210
206
211
207 def assert_session_flash(response=None, msg=None, category=None):
212 def assert_session_flash(response=None, msg=None, category=None):
208 """
213 """
209 Assert on a flash message in the current session.
214 Assert on a flash message in the current session.
210
215
211 :param msg: Required. The expected message. Will be evaluated if a
216 :param msg: Required. The expected message. Will be evaluated if a
212 :class:`LazyString` is passed in.
217 :class:`LazyString` is passed in.
213 :param response: Optional. For functional testing, pass in the response
218 :param response: Optional. For functional testing, pass in the response
214 object. Otherwise don't pass in any value.
219 object. Otherwise don't pass in any value.
215 :param category: Optional. If passed, the message category will be
220 :param category: Optional. If passed, the message category will be
216 checked as well.
221 checked as well.
217 """
222 """
218 if msg is None:
223 if msg is None:
219 raise ValueError("Parameter msg is required.")
224 raise ValueError("Parameter msg is required.")
220
225
221 messages = flash.pop_messages()
226 messages = flash.pop_messages()
222 message = messages[0]
227 message = messages[0]
223
228
224 msg = _eval_if_lazy(msg)
229 msg = _eval_if_lazy(msg)
225 message_text = _eval_if_lazy(message.message)
230 message_text = _eval_if_lazy(message.message)
226
231
227 if msg not in message_text:
232 if msg not in message_text:
228 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
233 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
229 msg, message_text)
234 msg, message_text)
230 pytest.fail(safe_str(msg))
235 pytest.fail(safe_str(msg))
231 if category:
236 if category:
232 assert category == message.category
237 assert category == message.category
233
238
234
239
235 def _eval_if_lazy(value):
240 def _eval_if_lazy(value):
236 return value.eval() if hasattr(value, 'eval') else value
241 return value.eval() if hasattr(value, 'eval') else value
237
242
238
243
239 def assert_not_in_session_flash(response, msg, category=None):
244 def assert_not_in_session_flash(response, msg, category=None):
240 assert 'flash' in response.session, 'Response session has no flash key'
245 assert 'flash' in response.session, 'Response session has no flash key'
241 message_category, message_text = response.session['flash'][0]
246 message_category, message_text = response.session['flash'][0]
242 if msg in message_text:
247 if msg in message_text:
243 msg = u'msg `%s` found in session flash: got `%s` instead' % (
248 msg = u'msg `%s` found in session flash: got `%s` instead' % (
244 msg, message_text)
249 msg, message_text)
245 pytest.fail(safe_str(msg))
250 pytest.fail(safe_str(msg))
246 if category:
251 if category:
247 assert category == message_category
252 assert category == message_category
248
253
249
254
250 def assert_session_flash_is_empty(response):
255 def assert_session_flash_is_empty(response):
251 if 'flash' in response.session:
256 if 'flash' in response.session:
252 msg = 'flash messages are present in session:%s' % \
257 msg = 'flash messages are present in session:%s' % \
253 response.session['flash'][0]
258 response.session['flash'][0]
254 pytest.fail(safe_str(msg))
259 pytest.fail(safe_str(msg))
@@ -1,272 +1,282 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import threading
21 import threading
22 import time
22 import time
23 import logging
23 import logging
24 import os.path
24 import os.path
25 import subprocess
25 import subprocess
26 import urllib2
26 import urllib2
27 from urlparse import urlparse, parse_qsl
27 from urlparse import urlparse, parse_qsl
28 from urllib import unquote_plus
28 from urllib import unquote_plus
29
29
30 import pytest
30 import pytest
31 import rc_testdata
31 import rc_testdata
32 from lxml.html import fromstring, tostring
32 from lxml.html import fromstring, tostring
33 from lxml.cssselect import CSSSelector
33 from lxml.cssselect import CSSSelector
34
34
35 from rhodecode.model.db import User
35 from rhodecode.model.db import User
36 from rhodecode.model.meta import Session
36 from rhodecode.model.meta import Session
37 from rhodecode.model.scm import ScmModel
37 from rhodecode.model.scm import ScmModel
38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39
39
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 def set_anonymous_access(enabled):
44 def set_anonymous_access(enabled):
45 """(Dis)allows anonymous access depending on parameter `enabled`"""
45 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 user = User.get_default_user()
46 user = User.get_default_user()
47 user.active = enabled
47 user.active = enabled
48 Session().add(user)
48 Session().add(user)
49 Session().commit()
49 Session().commit()
50 log.info('anonymous access is now: %s', enabled)
50 log.info('anonymous access is now: %s', enabled)
51 assert enabled == User.get_default_user().active, (
51 assert enabled == User.get_default_user().active, (
52 'Cannot set anonymous access')
52 'Cannot set anonymous access')
53
53
54
54
55 def check_xfail_backends(node, backend_alias):
55 def check_xfail_backends(node, backend_alias):
56 # Using "xfail_backends" here intentionally, since this marks work
56 # Using "xfail_backends" here intentionally, since this marks work
57 # which is "to be done" soon.
57 # which is "to be done" soon.
58 skip_marker = node.get_marker('xfail_backends')
58 skip_marker = node.get_marker('xfail_backends')
59 if skip_marker and backend_alias in skip_marker.args:
59 if skip_marker and backend_alias in skip_marker.args:
60 msg = "Support for backend %s to be developed." % (backend_alias, )
60 msg = "Support for backend %s to be developed." % (backend_alias, )
61 msg = skip_marker.kwargs.get('reason', msg)
61 msg = skip_marker.kwargs.get('reason', msg)
62 pytest.xfail(msg)
62 pytest.xfail(msg)
63
63
64
64
65 def check_skip_backends(node, backend_alias):
65 def check_skip_backends(node, backend_alias):
66 # Using "skip_backends" here intentionally, since this marks work which is
66 # Using "skip_backends" here intentionally, since this marks work which is
67 # not supported.
67 # not supported.
68 skip_marker = node.get_marker('skip_backends')
68 skip_marker = node.get_marker('skip_backends')
69 if skip_marker and backend_alias in skip_marker.args:
69 if skip_marker and backend_alias in skip_marker.args:
70 msg = "Feature not supported for backend %s." % (backend_alias, )
70 msg = "Feature not supported for backend %s." % (backend_alias, )
71 msg = skip_marker.kwargs.get('reason', msg)
71 msg = skip_marker.kwargs.get('reason', msg)
72 pytest.skip(msg)
72 pytest.skip(msg)
73
73
74
74
75 def extract_git_repo_from_dump(dump_name, repo_name):
75 def extract_git_repo_from_dump(dump_name, repo_name):
76 """Create git repo `repo_name` from dump `dump_name`."""
76 """Create git repo `repo_name` from dump `dump_name`."""
77 repos_path = ScmModel().repos_path
77 repos_path = ScmModel().repos_path
78 target_path = os.path.join(repos_path, repo_name)
78 target_path = os.path.join(repos_path, repo_name)
79 rc_testdata.extract_git_dump(dump_name, target_path)
79 rc_testdata.extract_git_dump(dump_name, target_path)
80 return target_path
80 return target_path
81
81
82
82
83 def extract_hg_repo_from_dump(dump_name, repo_name):
83 def extract_hg_repo_from_dump(dump_name, repo_name):
84 """Create hg repo `repo_name` from dump `dump_name`."""
84 """Create hg repo `repo_name` from dump `dump_name`."""
85 repos_path = ScmModel().repos_path
85 repos_path = ScmModel().repos_path
86 target_path = os.path.join(repos_path, repo_name)
86 target_path = os.path.join(repos_path, repo_name)
87 rc_testdata.extract_hg_dump(dump_name, target_path)
87 rc_testdata.extract_hg_dump(dump_name, target_path)
88 return target_path
88 return target_path
89
89
90
90
91 def extract_svn_repo_from_dump(dump_name, repo_name):
91 def extract_svn_repo_from_dump(dump_name, repo_name):
92 """Create a svn repo `repo_name` from dump `dump_name`."""
92 """Create a svn repo `repo_name` from dump `dump_name`."""
93 repos_path = ScmModel().repos_path
93 repos_path = ScmModel().repos_path
94 target_path = os.path.join(repos_path, repo_name)
94 target_path = os.path.join(repos_path, repo_name)
95 SubversionRepository(target_path, create=True)
95 SubversionRepository(target_path, create=True)
96 _load_svn_dump_into_repo(dump_name, target_path)
96 _load_svn_dump_into_repo(dump_name, target_path)
97 return target_path
97 return target_path
98
98
99
99
100 def assert_message_in_log(log_records, message, levelno, module):
100 def assert_message_in_log(log_records, message, levelno, module):
101 messages = [
101 messages = [
102 r.message for r in log_records
102 r.message for r in log_records
103 if r.module == module and r.levelno == levelno
103 if r.module == module and r.levelno == levelno
104 ]
104 ]
105 assert message in messages
105 assert message in messages
106
106
107
107
108 def _load_svn_dump_into_repo(dump_name, repo_path):
108 def _load_svn_dump_into_repo(dump_name, repo_path):
109 """
109 """
110 Utility to populate a svn repository with a named dump
110 Utility to populate a svn repository with a named dump
111
111
112 Currently the dumps are in rc_testdata. They might later on be
112 Currently the dumps are in rc_testdata. They might later on be
113 integrated with the main repository once they stabilize more.
113 integrated with the main repository once they stabilize more.
114 """
114 """
115 dump = rc_testdata.load_svn_dump(dump_name)
115 dump = rc_testdata.load_svn_dump(dump_name)
116 load_dump = subprocess.Popen(
116 load_dump = subprocess.Popen(
117 ['svnadmin', 'load', repo_path],
117 ['svnadmin', 'load', repo_path],
118 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
118 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
119 stderr=subprocess.PIPE)
119 stderr=subprocess.PIPE)
120 out, err = load_dump.communicate(dump)
120 out, err = load_dump.communicate(dump)
121 if load_dump.returncode != 0:
121 if load_dump.returncode != 0:
122 log.error("Output of load_dump command: %s", out)
122 log.error("Output of load_dump command: %s", out)
123 log.error("Error output of load_dump command: %s", err)
123 log.error("Error output of load_dump command: %s", err)
124 raise Exception(
124 raise Exception(
125 'Failed to load dump "%s" into repository at path "%s".'
125 'Failed to load dump "%s" into repository at path "%s".'
126 % (dump_name, repo_path))
126 % (dump_name, repo_path))
127
127
128
128
129 class AssertResponse(object):
129 class AssertResponse(object):
130 """
130 """
131 Utility that helps to assert things about a given HTML response.
131 Utility that helps to assert things about a given HTML response.
132 """
132 """
133
133
134 def __init__(self, response):
134 def __init__(self, response):
135 self.response = response
135 self.response = response
136
136
137 def one_element_exists(self, css_selector):
137 def one_element_exists(self, css_selector):
138 self.get_element(css_selector)
138 self.get_element(css_selector)
139
139
140 def no_element_exists(self, css_selector):
140 def no_element_exists(self, css_selector):
141 assert not self._get_elements(css_selector)
141 assert not self._get_elements(css_selector)
142
142
143 def element_equals_to(self, css_selector, expected_content):
143 def element_equals_to(self, css_selector, expected_content):
144 element = self.get_element(css_selector)
144 element = self.get_element(css_selector)
145 element_text = self._element_to_string(element)
145 element_text = self._element_to_string(element)
146 assert expected_content in element_text
146 assert expected_content in element_text
147
147
148 def element_contains(self, css_selector, expected_content):
148 def element_contains(self, css_selector, expected_content):
149 element = self.get_element(css_selector)
149 element = self.get_element(css_selector)
150 assert expected_content in element.text_content()
150 assert expected_content in element.text_content()
151
151
152 def contains_one_link(self, link_text, href):
152 def contains_one_link(self, link_text, href):
153 doc = fromstring(self.response.body)
153 doc = fromstring(self.response.body)
154 sel = CSSSelector('a[href]')
154 sel = CSSSelector('a[href]')
155 elements = [
155 elements = [
156 e for e in sel(doc) if e.text_content().strip() == link_text]
156 e for e in sel(doc) if e.text_content().strip() == link_text]
157 assert len(elements) == 1, "Did not find link or found multiple links"
157 assert len(elements) == 1, "Did not find link or found multiple links"
158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
159
159
160 def contains_one_anchor(self, anchor_id):
160 def contains_one_anchor(self, anchor_id):
161 doc = fromstring(self.response.body)
161 doc = fromstring(self.response.body)
162 sel = CSSSelector('#' + anchor_id)
162 sel = CSSSelector('#' + anchor_id)
163 elements = sel(doc)
163 elements = sel(doc)
164 assert len(elements) == 1
164 assert len(elements) == 1
165
165
166 def _ensure_url_equal(self, found, expected):
166 def _ensure_url_equal(self, found, expected):
167 assert _Url(found) == _Url(expected)
167 assert _Url(found) == _Url(expected)
168
168
169 def get_element(self, css_selector):
169 def get_element(self, css_selector):
170 elements = self._get_elements(css_selector)
170 elements = self._get_elements(css_selector)
171 assert len(elements) == 1
171 assert len(elements) == 1
172 return elements[0]
172 return elements[0]
173
173
174 def _get_elements(self, css_selector):
174 def _get_elements(self, css_selector):
175 doc = fromstring(self.response.body)
175 doc = fromstring(self.response.body)
176 sel = CSSSelector(css_selector)
176 sel = CSSSelector(css_selector)
177 elements = sel(doc)
177 elements = sel(doc)
178 return elements
178 return elements
179
179
180 def _element_to_string(self, element):
180 def _element_to_string(self, element):
181 return tostring(element)
181 return tostring(element)
182
182
183
183
184 class _Url(object):
184 class _Url(object):
185 """
185 """
186 A url object that can be compared with other url orbjects
186 A url object that can be compared with other url orbjects
187 without regard to the vagaries of encoding, escaping, and ordering
187 without regard to the vagaries of encoding, escaping, and ordering
188 of parameters in query strings.
188 of parameters in query strings.
189
189
190 Inspired by
190 Inspired by
191 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
191 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
192 """
192 """
193
193
194 def __init__(self, url):
194 def __init__(self, url):
195 parts = urlparse(url)
195 parts = urlparse(url)
196 _query = frozenset(parse_qsl(parts.query))
196 _query = frozenset(parse_qsl(parts.query))
197 _path = unquote_plus(parts.path)
197 _path = unquote_plus(parts.path)
198 parts = parts._replace(query=_query, path=_path)
198 parts = parts._replace(query=_query, path=_path)
199 self.parts = parts
199 self.parts = parts
200
200
201 def __eq__(self, other):
201 def __eq__(self, other):
202 return self.parts == other.parts
202 return self.parts == other.parts
203
203
204 def __hash__(self):
204 def __hash__(self):
205 return hash(self.parts)
205 return hash(self.parts)
206
206
207
207
208 def run_test_concurrently(times, raise_catched_exc=True):
208 def run_test_concurrently(times, raise_catched_exc=True):
209 """
209 """
210 Add this decorator to small pieces of code that you want to test
210 Add this decorator to small pieces of code that you want to test
211 concurrently
211 concurrently
212
212
213 ex:
213 ex:
214
214
215 @test_concurrently(25)
215 @test_concurrently(25)
216 def my_test_function():
216 def my_test_function():
217 ...
217 ...
218 """
218 """
219 def test_concurrently_decorator(test_func):
219 def test_concurrently_decorator(test_func):
220 def wrapper(*args, **kwargs):
220 def wrapper(*args, **kwargs):
221 exceptions = []
221 exceptions = []
222
222
223 def call_test_func():
223 def call_test_func():
224 try:
224 try:
225 test_func(*args, **kwargs)
225 test_func(*args, **kwargs)
226 except Exception, e:
226 except Exception, e:
227 exceptions.append(e)
227 exceptions.append(e)
228 if raise_catched_exc:
228 if raise_catched_exc:
229 raise
229 raise
230 threads = []
230 threads = []
231 for i in range(times):
231 for i in range(times):
232 threads.append(threading.Thread(target=call_test_func))
232 threads.append(threading.Thread(target=call_test_func))
233 for t in threads:
233 for t in threads:
234 t.start()
234 t.start()
235 for t in threads:
235 for t in threads:
236 t.join()
236 t.join()
237 if exceptions:
237 if exceptions:
238 raise Exception(
238 raise Exception(
239 'test_concurrently intercepted %s exceptions: %s' % (
239 'test_concurrently intercepted %s exceptions: %s' % (
240 len(exceptions), exceptions))
240 len(exceptions), exceptions))
241 return wrapper
241 return wrapper
242 return test_concurrently_decorator
242 return test_concurrently_decorator
243
243
244
244
245 def wait_for_url(url, timeout=10):
245 def wait_for_url(url, timeout=10):
246 """
246 """
247 Wait until URL becomes reachable.
247 Wait until URL becomes reachable.
248
248
249 It polls the URL until the timeout is reached or it became reachable.
249 It polls the URL until the timeout is reached or it became reachable.
250 If will call to `py.test.fail` in case the URL is not reachable.
250 If will call to `py.test.fail` in case the URL is not reachable.
251 """
251 """
252 timeout = time.time() + timeout
252 timeout = time.time() + timeout
253 last = 0
253 last = 0
254 wait = 0.1
254 wait = 0.1
255
255
256 while (timeout > last):
256 while (timeout > last):
257 last = time.time()
257 last = time.time()
258 if is_url_reachable(url):
258 if is_url_reachable(url):
259 break
259 break
260 elif ((last + wait) > time.time()):
260 elif ((last + wait) > time.time()):
261 # Go to sleep because not enough time has passed since last check.
261 # Go to sleep because not enough time has passed since last check.
262 time.sleep(wait)
262 time.sleep(wait)
263 else:
263 else:
264 pytest.fail("Timeout while waiting for URL {}".format(url))
264 pytest.fail("Timeout while waiting for URL {}".format(url))
265
265
266
266
267 def is_url_reachable(url):
267 def is_url_reachable(url):
268 try:
268 try:
269 urllib2.urlopen(url)
269 urllib2.urlopen(url)
270 except urllib2.URLError:
270 except urllib2.URLError:
271 return False
271 return False
272 return True
272 return True
273
274
275 def get_session_from_response(response):
276 """
277 This returns the session from a response object. Pylons has some magic
278 to make the session available as `response.session`. But pyramid
279 doesn't expose it.
280 """
281 # TODO: Try to look up the session key also.
282 return response.request.environ['beaker.session']
General Comments 0
You need to be logged in to leave comments. Login now