##// END OF EJS Templates
tests: Add a method to AssertResponse to check value of elements.
Martin Bornhold -
r1046:40393b0a default
parent child Browse files
Show More
@@ -1,293 +1,297 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import urllib2
27 27 from urlparse import urlparse, parse_qsl
28 28 from urllib import unquote_plus
29 29
30 30 import pytest
31 31 import rc_testdata
32 32 from lxml.html import fromstring, tostring
33 33 from lxml.cssselect import CSSSelector
34 34
35 35 from rhodecode.model.db import User
36 36 from rhodecode.model.meta import Session
37 37 from rhodecode.model.scm import ScmModel
38 38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39 39
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 def set_anonymous_access(enabled):
45 45 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 46 user = User.get_default_user()
47 47 user.active = enabled
48 48 Session().add(user)
49 49 Session().commit()
50 50 log.info('anonymous access is now: %s', enabled)
51 51 assert enabled == User.get_default_user().active, (
52 52 'Cannot set anonymous access')
53 53
54 54
55 55 def check_xfail_backends(node, backend_alias):
56 56 # Using "xfail_backends" here intentionally, since this marks work
57 57 # which is "to be done" soon.
58 58 skip_marker = node.get_marker('xfail_backends')
59 59 if skip_marker and backend_alias in skip_marker.args:
60 60 msg = "Support for backend %s to be developed." % (backend_alias, )
61 61 msg = skip_marker.kwargs.get('reason', msg)
62 62 pytest.xfail(msg)
63 63
64 64
65 65 def check_skip_backends(node, backend_alias):
66 66 # Using "skip_backends" here intentionally, since this marks work which is
67 67 # not supported.
68 68 skip_marker = node.get_marker('skip_backends')
69 69 if skip_marker and backend_alias in skip_marker.args:
70 70 msg = "Feature not supported for backend %s." % (backend_alias, )
71 71 msg = skip_marker.kwargs.get('reason', msg)
72 72 pytest.skip(msg)
73 73
74 74
75 75 def extract_git_repo_from_dump(dump_name, repo_name):
76 76 """Create git repo `repo_name` from dump `dump_name`."""
77 77 repos_path = ScmModel().repos_path
78 78 target_path = os.path.join(repos_path, repo_name)
79 79 rc_testdata.extract_git_dump(dump_name, target_path)
80 80 return target_path
81 81
82 82
83 83 def extract_hg_repo_from_dump(dump_name, repo_name):
84 84 """Create hg repo `repo_name` from dump `dump_name`."""
85 85 repos_path = ScmModel().repos_path
86 86 target_path = os.path.join(repos_path, repo_name)
87 87 rc_testdata.extract_hg_dump(dump_name, target_path)
88 88 return target_path
89 89
90 90
91 91 def extract_svn_repo_from_dump(dump_name, repo_name):
92 92 """Create a svn repo `repo_name` from dump `dump_name`."""
93 93 repos_path = ScmModel().repos_path
94 94 target_path = os.path.join(repos_path, repo_name)
95 95 SubversionRepository(target_path, create=True)
96 96 _load_svn_dump_into_repo(dump_name, target_path)
97 97 return target_path
98 98
99 99
100 100 def assert_message_in_log(log_records, message, levelno, module):
101 101 messages = [
102 102 r.message for r in log_records
103 103 if r.module == module and r.levelno == levelno
104 104 ]
105 105 assert message in messages
106 106
107 107
108 108 def _load_svn_dump_into_repo(dump_name, repo_path):
109 109 """
110 110 Utility to populate a svn repository with a named dump
111 111
112 112 Currently the dumps are in rc_testdata. They might later on be
113 113 integrated with the main repository once they stabilize more.
114 114 """
115 115 dump = rc_testdata.load_svn_dump(dump_name)
116 116 load_dump = subprocess32.Popen(
117 117 ['svnadmin', 'load', repo_path],
118 118 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
119 119 stderr=subprocess32.PIPE)
120 120 out, err = load_dump.communicate(dump)
121 121 if load_dump.returncode != 0:
122 122 log.error("Output of load_dump command: %s", out)
123 123 log.error("Error output of load_dump command: %s", err)
124 124 raise Exception(
125 125 'Failed to load dump "%s" into repository at path "%s".'
126 126 % (dump_name, repo_path))
127 127
128 128
129 129 class AssertResponse(object):
130 130 """
131 131 Utility that helps to assert things about a given HTML response.
132 132 """
133 133
134 134 def __init__(self, response):
135 135 self.response = response
136 136
137 137 def one_element_exists(self, css_selector):
138 138 self.get_element(css_selector)
139 139
140 140 def no_element_exists(self, css_selector):
141 141 assert not self._get_elements(css_selector)
142 142
143 143 def element_equals_to(self, css_selector, expected_content):
144 144 element = self.get_element(css_selector)
145 145 element_text = self._element_to_string(element)
146 146 assert expected_content in element_text
147 147
148 148 def element_contains(self, css_selector, expected_content):
149 149 element = self.get_element(css_selector)
150 150 assert expected_content in element.text_content()
151 151
152 def element_value_contains(self, css_selector, expected_content):
153 element = self.get_element(css_selector)
154 assert expected_content in element.value
155
152 156 def contains_one_link(self, link_text, href):
153 157 doc = fromstring(self.response.body)
154 158 sel = CSSSelector('a[href]')
155 159 elements = [
156 160 e for e in sel(doc) if e.text_content().strip() == link_text]
157 161 assert len(elements) == 1, "Did not find link or found multiple links"
158 162 self._ensure_url_equal(elements[0].attrib.get('href'), href)
159 163
160 164 def contains_one_anchor(self, anchor_id):
161 165 doc = fromstring(self.response.body)
162 166 sel = CSSSelector('#' + anchor_id)
163 167 elements = sel(doc)
164 168 assert len(elements) == 1
165 169
166 170 def _ensure_url_equal(self, found, expected):
167 171 assert _Url(found) == _Url(expected)
168 172
169 173 def get_element(self, css_selector):
170 174 elements = self._get_elements(css_selector)
171 175 assert len(elements) == 1
172 176 return elements[0]
173 177
174 178 def get_elements(self, css_selector):
175 179 return self._get_elements(css_selector)
176 180
177 181 def _get_elements(self, css_selector):
178 182 doc = fromstring(self.response.body)
179 183 sel = CSSSelector(css_selector)
180 184 elements = sel(doc)
181 185 return elements
182 186
183 187 def _element_to_string(self, element):
184 188 return tostring(element)
185 189
186 190
187 191 class _Url(object):
188 192 """
189 193 A url object that can be compared with other url orbjects
190 194 without regard to the vagaries of encoding, escaping, and ordering
191 195 of parameters in query strings.
192 196
193 197 Inspired by
194 198 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
195 199 """
196 200
197 201 def __init__(self, url):
198 202 parts = urlparse(url)
199 203 _query = frozenset(parse_qsl(parts.query))
200 204 _path = unquote_plus(parts.path)
201 205 parts = parts._replace(query=_query, path=_path)
202 206 self.parts = parts
203 207
204 208 def __eq__(self, other):
205 209 return self.parts == other.parts
206 210
207 211 def __hash__(self):
208 212 return hash(self.parts)
209 213
210 214
211 215 def run_test_concurrently(times, raise_catched_exc=True):
212 216 """
213 217 Add this decorator to small pieces of code that you want to test
214 218 concurrently
215 219
216 220 ex:
217 221
218 222 @test_concurrently(25)
219 223 def my_test_function():
220 224 ...
221 225 """
222 226 def test_concurrently_decorator(test_func):
223 227 def wrapper(*args, **kwargs):
224 228 exceptions = []
225 229
226 230 def call_test_func():
227 231 try:
228 232 test_func(*args, **kwargs)
229 233 except Exception, e:
230 234 exceptions.append(e)
231 235 if raise_catched_exc:
232 236 raise
233 237 threads = []
234 238 for i in range(times):
235 239 threads.append(threading.Thread(target=call_test_func))
236 240 for t in threads:
237 241 t.start()
238 242 for t in threads:
239 243 t.join()
240 244 if exceptions:
241 245 raise Exception(
242 246 'test_concurrently intercepted %s exceptions: %s' % (
243 247 len(exceptions), exceptions))
244 248 return wrapper
245 249 return test_concurrently_decorator
246 250
247 251
248 252 def wait_for_url(url, timeout=10):
249 253 """
250 254 Wait until URL becomes reachable.
251 255
252 256 It polls the URL until the timeout is reached or it became reachable.
253 257 If will call to `py.test.fail` in case the URL is not reachable.
254 258 """
255 259 timeout = time.time() + timeout
256 260 last = 0
257 261 wait = 0.1
258 262
259 263 while (timeout > last):
260 264 last = time.time()
261 265 if is_url_reachable(url):
262 266 break
263 267 elif ((last + wait) > time.time()):
264 268 # Go to sleep because not enough time has passed since last check.
265 269 time.sleep(wait)
266 270 else:
267 271 pytest.fail("Timeout while waiting for URL {}".format(url))
268 272
269 273
270 274 def is_url_reachable(url):
271 275 try:
272 276 urllib2.urlopen(url)
273 277 except urllib2.URLError:
274 278 return False
275 279 return True
276 280
277 281
278 282 def get_session_from_response(response):
279 283 """
280 284 This returns the session from a response object. Pylons has some magic
281 285 to make the session available as `response.session`. But pyramid
282 286 doesn't expose it.
283 287 """
284 288 # TODO: Try to look up the session key also.
285 289 return response.request.environ['beaker.session']
286 290
287 291
288 292 def repo_on_filesystem(repo_name):
289 293 from rhodecode.lib import vcs
290 294 from rhodecode.tests import TESTS_TMP_PATH
291 295 repo = vcs.get_vcs_instance(
292 296 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
293 297 return repo is not None
General Comments 0
You need to be logged in to leave comments. Login now