##// END OF EJS Templates
tests: Add a method to AssertResponse to check value of elements.
Martin Bornhold -
r1046:40393b0a default
parent child Browse files
Show More
@@ -1,293 +1,297 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import threading
21 import threading
22 import time
22 import time
23 import logging
23 import logging
24 import os.path
24 import os.path
25 import subprocess32
25 import subprocess32
26 import urllib2
26 import urllib2
27 from urlparse import urlparse, parse_qsl
27 from urlparse import urlparse, parse_qsl
28 from urllib import unquote_plus
28 from urllib import unquote_plus
29
29
30 import pytest
30 import pytest
31 import rc_testdata
31 import rc_testdata
32 from lxml.html import fromstring, tostring
32 from lxml.html import fromstring, tostring
33 from lxml.cssselect import CSSSelector
33 from lxml.cssselect import CSSSelector
34
34
35 from rhodecode.model.db import User
35 from rhodecode.model.db import User
36 from rhodecode.model.meta import Session
36 from rhodecode.model.meta import Session
37 from rhodecode.model.scm import ScmModel
37 from rhodecode.model.scm import ScmModel
38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39
39
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 def set_anonymous_access(enabled):
44 def set_anonymous_access(enabled):
45 """(Dis)allows anonymous access depending on parameter `enabled`"""
45 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 user = User.get_default_user()
46 user = User.get_default_user()
47 user.active = enabled
47 user.active = enabled
48 Session().add(user)
48 Session().add(user)
49 Session().commit()
49 Session().commit()
50 log.info('anonymous access is now: %s', enabled)
50 log.info('anonymous access is now: %s', enabled)
51 assert enabled == User.get_default_user().active, (
51 assert enabled == User.get_default_user().active, (
52 'Cannot set anonymous access')
52 'Cannot set anonymous access')
53
53
54
54
55 def check_xfail_backends(node, backend_alias):
55 def check_xfail_backends(node, backend_alias):
56 # Using "xfail_backends" here intentionally, since this marks work
56 # Using "xfail_backends" here intentionally, since this marks work
57 # which is "to be done" soon.
57 # which is "to be done" soon.
58 skip_marker = node.get_marker('xfail_backends')
58 skip_marker = node.get_marker('xfail_backends')
59 if skip_marker and backend_alias in skip_marker.args:
59 if skip_marker and backend_alias in skip_marker.args:
60 msg = "Support for backend %s to be developed." % (backend_alias, )
60 msg = "Support for backend %s to be developed." % (backend_alias, )
61 msg = skip_marker.kwargs.get('reason', msg)
61 msg = skip_marker.kwargs.get('reason', msg)
62 pytest.xfail(msg)
62 pytest.xfail(msg)
63
63
64
64
65 def check_skip_backends(node, backend_alias):
65 def check_skip_backends(node, backend_alias):
66 # Using "skip_backends" here intentionally, since this marks work which is
66 # Using "skip_backends" here intentionally, since this marks work which is
67 # not supported.
67 # not supported.
68 skip_marker = node.get_marker('skip_backends')
68 skip_marker = node.get_marker('skip_backends')
69 if skip_marker and backend_alias in skip_marker.args:
69 if skip_marker and backend_alias in skip_marker.args:
70 msg = "Feature not supported for backend %s." % (backend_alias, )
70 msg = "Feature not supported for backend %s." % (backend_alias, )
71 msg = skip_marker.kwargs.get('reason', msg)
71 msg = skip_marker.kwargs.get('reason', msg)
72 pytest.skip(msg)
72 pytest.skip(msg)
73
73
74
74
75 def extract_git_repo_from_dump(dump_name, repo_name):
75 def extract_git_repo_from_dump(dump_name, repo_name):
76 """Create git repo `repo_name` from dump `dump_name`."""
76 """Create git repo `repo_name` from dump `dump_name`."""
77 repos_path = ScmModel().repos_path
77 repos_path = ScmModel().repos_path
78 target_path = os.path.join(repos_path, repo_name)
78 target_path = os.path.join(repos_path, repo_name)
79 rc_testdata.extract_git_dump(dump_name, target_path)
79 rc_testdata.extract_git_dump(dump_name, target_path)
80 return target_path
80 return target_path
81
81
82
82
83 def extract_hg_repo_from_dump(dump_name, repo_name):
83 def extract_hg_repo_from_dump(dump_name, repo_name):
84 """Create hg repo `repo_name` from dump `dump_name`."""
84 """Create hg repo `repo_name` from dump `dump_name`."""
85 repos_path = ScmModel().repos_path
85 repos_path = ScmModel().repos_path
86 target_path = os.path.join(repos_path, repo_name)
86 target_path = os.path.join(repos_path, repo_name)
87 rc_testdata.extract_hg_dump(dump_name, target_path)
87 rc_testdata.extract_hg_dump(dump_name, target_path)
88 return target_path
88 return target_path
89
89
90
90
91 def extract_svn_repo_from_dump(dump_name, repo_name):
91 def extract_svn_repo_from_dump(dump_name, repo_name):
92 """Create a svn repo `repo_name` from dump `dump_name`."""
92 """Create a svn repo `repo_name` from dump `dump_name`."""
93 repos_path = ScmModel().repos_path
93 repos_path = ScmModel().repos_path
94 target_path = os.path.join(repos_path, repo_name)
94 target_path = os.path.join(repos_path, repo_name)
95 SubversionRepository(target_path, create=True)
95 SubversionRepository(target_path, create=True)
96 _load_svn_dump_into_repo(dump_name, target_path)
96 _load_svn_dump_into_repo(dump_name, target_path)
97 return target_path
97 return target_path
98
98
99
99
100 def assert_message_in_log(log_records, message, levelno, module):
100 def assert_message_in_log(log_records, message, levelno, module):
101 messages = [
101 messages = [
102 r.message for r in log_records
102 r.message for r in log_records
103 if r.module == module and r.levelno == levelno
103 if r.module == module and r.levelno == levelno
104 ]
104 ]
105 assert message in messages
105 assert message in messages
106
106
107
107
108 def _load_svn_dump_into_repo(dump_name, repo_path):
108 def _load_svn_dump_into_repo(dump_name, repo_path):
109 """
109 """
110 Utility to populate a svn repository with a named dump
110 Utility to populate a svn repository with a named dump
111
111
112 Currently the dumps are in rc_testdata. They might later on be
112 Currently the dumps are in rc_testdata. They might later on be
113 integrated with the main repository once they stabilize more.
113 integrated with the main repository once they stabilize more.
114 """
114 """
115 dump = rc_testdata.load_svn_dump(dump_name)
115 dump = rc_testdata.load_svn_dump(dump_name)
116 load_dump = subprocess32.Popen(
116 load_dump = subprocess32.Popen(
117 ['svnadmin', 'load', repo_path],
117 ['svnadmin', 'load', repo_path],
118 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
118 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
119 stderr=subprocess32.PIPE)
119 stderr=subprocess32.PIPE)
120 out, err = load_dump.communicate(dump)
120 out, err = load_dump.communicate(dump)
121 if load_dump.returncode != 0:
121 if load_dump.returncode != 0:
122 log.error("Output of load_dump command: %s", out)
122 log.error("Output of load_dump command: %s", out)
123 log.error("Error output of load_dump command: %s", err)
123 log.error("Error output of load_dump command: %s", err)
124 raise Exception(
124 raise Exception(
125 'Failed to load dump "%s" into repository at path "%s".'
125 'Failed to load dump "%s" into repository at path "%s".'
126 % (dump_name, repo_path))
126 % (dump_name, repo_path))
127
127
128
128
129 class AssertResponse(object):
129 class AssertResponse(object):
130 """
130 """
131 Utility that helps to assert things about a given HTML response.
131 Utility that helps to assert things about a given HTML response.
132 """
132 """
133
133
134 def __init__(self, response):
134 def __init__(self, response):
135 self.response = response
135 self.response = response
136
136
137 def one_element_exists(self, css_selector):
137 def one_element_exists(self, css_selector):
138 self.get_element(css_selector)
138 self.get_element(css_selector)
139
139
140 def no_element_exists(self, css_selector):
140 def no_element_exists(self, css_selector):
141 assert not self._get_elements(css_selector)
141 assert not self._get_elements(css_selector)
142
142
143 def element_equals_to(self, css_selector, expected_content):
143 def element_equals_to(self, css_selector, expected_content):
144 element = self.get_element(css_selector)
144 element = self.get_element(css_selector)
145 element_text = self._element_to_string(element)
145 element_text = self._element_to_string(element)
146 assert expected_content in element_text
146 assert expected_content in element_text
147
147
148 def element_contains(self, css_selector, expected_content):
148 def element_contains(self, css_selector, expected_content):
149 element = self.get_element(css_selector)
149 element = self.get_element(css_selector)
150 assert expected_content in element.text_content()
150 assert expected_content in element.text_content()
151
151
152 def element_value_contains(self, css_selector, expected_content):
153 element = self.get_element(css_selector)
154 assert expected_content in element.value
155
152 def contains_one_link(self, link_text, href):
156 def contains_one_link(self, link_text, href):
153 doc = fromstring(self.response.body)
157 doc = fromstring(self.response.body)
154 sel = CSSSelector('a[href]')
158 sel = CSSSelector('a[href]')
155 elements = [
159 elements = [
156 e for e in sel(doc) if e.text_content().strip() == link_text]
160 e for e in sel(doc) if e.text_content().strip() == link_text]
157 assert len(elements) == 1, "Did not find link or found multiple links"
161 assert len(elements) == 1, "Did not find link or found multiple links"
158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
162 self._ensure_url_equal(elements[0].attrib.get('href'), href)
159
163
160 def contains_one_anchor(self, anchor_id):
164 def contains_one_anchor(self, anchor_id):
161 doc = fromstring(self.response.body)
165 doc = fromstring(self.response.body)
162 sel = CSSSelector('#' + anchor_id)
166 sel = CSSSelector('#' + anchor_id)
163 elements = sel(doc)
167 elements = sel(doc)
164 assert len(elements) == 1
168 assert len(elements) == 1
165
169
166 def _ensure_url_equal(self, found, expected):
170 def _ensure_url_equal(self, found, expected):
167 assert _Url(found) == _Url(expected)
171 assert _Url(found) == _Url(expected)
168
172
169 def get_element(self, css_selector):
173 def get_element(self, css_selector):
170 elements = self._get_elements(css_selector)
174 elements = self._get_elements(css_selector)
171 assert len(elements) == 1
175 assert len(elements) == 1
172 return elements[0]
176 return elements[0]
173
177
174 def get_elements(self, css_selector):
178 def get_elements(self, css_selector):
175 return self._get_elements(css_selector)
179 return self._get_elements(css_selector)
176
180
177 def _get_elements(self, css_selector):
181 def _get_elements(self, css_selector):
178 doc = fromstring(self.response.body)
182 doc = fromstring(self.response.body)
179 sel = CSSSelector(css_selector)
183 sel = CSSSelector(css_selector)
180 elements = sel(doc)
184 elements = sel(doc)
181 return elements
185 return elements
182
186
183 def _element_to_string(self, element):
187 def _element_to_string(self, element):
184 return tostring(element)
188 return tostring(element)
185
189
186
190
187 class _Url(object):
191 class _Url(object):
188 """
192 """
189 A url object that can be compared with other url orbjects
193 A url object that can be compared with other url orbjects
190 without regard to the vagaries of encoding, escaping, and ordering
194 without regard to the vagaries of encoding, escaping, and ordering
191 of parameters in query strings.
195 of parameters in query strings.
192
196
193 Inspired by
197 Inspired by
194 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
198 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
195 """
199 """
196
200
197 def __init__(self, url):
201 def __init__(self, url):
198 parts = urlparse(url)
202 parts = urlparse(url)
199 _query = frozenset(parse_qsl(parts.query))
203 _query = frozenset(parse_qsl(parts.query))
200 _path = unquote_plus(parts.path)
204 _path = unquote_plus(parts.path)
201 parts = parts._replace(query=_query, path=_path)
205 parts = parts._replace(query=_query, path=_path)
202 self.parts = parts
206 self.parts = parts
203
207
204 def __eq__(self, other):
208 def __eq__(self, other):
205 return self.parts == other.parts
209 return self.parts == other.parts
206
210
207 def __hash__(self):
211 def __hash__(self):
208 return hash(self.parts)
212 return hash(self.parts)
209
213
210
214
211 def run_test_concurrently(times, raise_catched_exc=True):
215 def run_test_concurrently(times, raise_catched_exc=True):
212 """
216 """
213 Add this decorator to small pieces of code that you want to test
217 Add this decorator to small pieces of code that you want to test
214 concurrently
218 concurrently
215
219
216 ex:
220 ex:
217
221
218 @test_concurrently(25)
222 @test_concurrently(25)
219 def my_test_function():
223 def my_test_function():
220 ...
224 ...
221 """
225 """
222 def test_concurrently_decorator(test_func):
226 def test_concurrently_decorator(test_func):
223 def wrapper(*args, **kwargs):
227 def wrapper(*args, **kwargs):
224 exceptions = []
228 exceptions = []
225
229
226 def call_test_func():
230 def call_test_func():
227 try:
231 try:
228 test_func(*args, **kwargs)
232 test_func(*args, **kwargs)
229 except Exception, e:
233 except Exception, e:
230 exceptions.append(e)
234 exceptions.append(e)
231 if raise_catched_exc:
235 if raise_catched_exc:
232 raise
236 raise
233 threads = []
237 threads = []
234 for i in range(times):
238 for i in range(times):
235 threads.append(threading.Thread(target=call_test_func))
239 threads.append(threading.Thread(target=call_test_func))
236 for t in threads:
240 for t in threads:
237 t.start()
241 t.start()
238 for t in threads:
242 for t in threads:
239 t.join()
243 t.join()
240 if exceptions:
244 if exceptions:
241 raise Exception(
245 raise Exception(
242 'test_concurrently intercepted %s exceptions: %s' % (
246 'test_concurrently intercepted %s exceptions: %s' % (
243 len(exceptions), exceptions))
247 len(exceptions), exceptions))
244 return wrapper
248 return wrapper
245 return test_concurrently_decorator
249 return test_concurrently_decorator
246
250
247
251
248 def wait_for_url(url, timeout=10):
252 def wait_for_url(url, timeout=10):
249 """
253 """
250 Wait until URL becomes reachable.
254 Wait until URL becomes reachable.
251
255
252 It polls the URL until the timeout is reached or it became reachable.
256 It polls the URL until the timeout is reached or it became reachable.
253 If will call to `py.test.fail` in case the URL is not reachable.
257 If will call to `py.test.fail` in case the URL is not reachable.
254 """
258 """
255 timeout = time.time() + timeout
259 timeout = time.time() + timeout
256 last = 0
260 last = 0
257 wait = 0.1
261 wait = 0.1
258
262
259 while (timeout > last):
263 while (timeout > last):
260 last = time.time()
264 last = time.time()
261 if is_url_reachable(url):
265 if is_url_reachable(url):
262 break
266 break
263 elif ((last + wait) > time.time()):
267 elif ((last + wait) > time.time()):
264 # Go to sleep because not enough time has passed since last check.
268 # Go to sleep because not enough time has passed since last check.
265 time.sleep(wait)
269 time.sleep(wait)
266 else:
270 else:
267 pytest.fail("Timeout while waiting for URL {}".format(url))
271 pytest.fail("Timeout while waiting for URL {}".format(url))
268
272
269
273
270 def is_url_reachable(url):
274 def is_url_reachable(url):
271 try:
275 try:
272 urllib2.urlopen(url)
276 urllib2.urlopen(url)
273 except urllib2.URLError:
277 except urllib2.URLError:
274 return False
278 return False
275 return True
279 return True
276
280
277
281
278 def get_session_from_response(response):
282 def get_session_from_response(response):
279 """
283 """
280 This returns the session from a response object. Pylons has some magic
284 This returns the session from a response object. Pylons has some magic
281 to make the session available as `response.session`. But pyramid
285 to make the session available as `response.session`. But pyramid
282 doesn't expose it.
286 doesn't expose it.
283 """
287 """
284 # TODO: Try to look up the session key also.
288 # TODO: Try to look up the session key also.
285 return response.request.environ['beaker.session']
289 return response.request.environ['beaker.session']
286
290
287
291
288 def repo_on_filesystem(repo_name):
292 def repo_on_filesystem(repo_name):
289 from rhodecode.lib import vcs
293 from rhodecode.lib import vcs
290 from rhodecode.tests import TESTS_TMP_PATH
294 from rhodecode.tests import TESTS_TMP_PATH
291 repo = vcs.get_vcs_instance(
295 repo = vcs.get_vcs_instance(
292 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
296 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
293 return repo is not None
297 return repo is not None
General Comments 0
You need to be logged in to leave comments. Login now