##// END OF EJS Templates
test: moved lxml to local imports since this only exists during tests.
marcink -
r1239:95fd13ba default
parent child Browse files
Show More
@@ -1,297 +1,304 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import urllib2
27 27 from urlparse import urlparse, parse_qsl
28 28 from urllib import unquote_plus
29 29
30 30 import pytest
31 31 import rc_testdata
32 from lxml.html import fromstring, tostring
33 from lxml.cssselect import CSSSelector
34 32
35 33 from rhodecode.model.db import User
36 34 from rhodecode.model.meta import Session
37 35 from rhodecode.model.scm import ScmModel
38 36 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39 37
40 38
41 39 log = logging.getLogger(__name__)
42 40
43 41
44 42 def set_anonymous_access(enabled):
45 43 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 44 user = User.get_default_user()
47 45 user.active = enabled
48 46 Session().add(user)
49 47 Session().commit()
50 48 log.info('anonymous access is now: %s', enabled)
51 49 assert enabled == User.get_default_user().active, (
52 50 'Cannot set anonymous access')
53 51
54 52
55 53 def check_xfail_backends(node, backend_alias):
56 54 # Using "xfail_backends" here intentionally, since this marks work
57 55 # which is "to be done" soon.
58 56 skip_marker = node.get_marker('xfail_backends')
59 57 if skip_marker and backend_alias in skip_marker.args:
60 58 msg = "Support for backend %s to be developed." % (backend_alias, )
61 59 msg = skip_marker.kwargs.get('reason', msg)
62 60 pytest.xfail(msg)
63 61
64 62
65 63 def check_skip_backends(node, backend_alias):
66 64 # Using "skip_backends" here intentionally, since this marks work which is
67 65 # not supported.
68 66 skip_marker = node.get_marker('skip_backends')
69 67 if skip_marker and backend_alias in skip_marker.args:
70 68 msg = "Feature not supported for backend %s." % (backend_alias, )
71 69 msg = skip_marker.kwargs.get('reason', msg)
72 70 pytest.skip(msg)
73 71
74 72
75 73 def extract_git_repo_from_dump(dump_name, repo_name):
76 74 """Create git repo `repo_name` from dump `dump_name`."""
77 75 repos_path = ScmModel().repos_path
78 76 target_path = os.path.join(repos_path, repo_name)
79 77 rc_testdata.extract_git_dump(dump_name, target_path)
80 78 return target_path
81 79
82 80
83 81 def extract_hg_repo_from_dump(dump_name, repo_name):
84 82 """Create hg repo `repo_name` from dump `dump_name`."""
85 83 repos_path = ScmModel().repos_path
86 84 target_path = os.path.join(repos_path, repo_name)
87 85 rc_testdata.extract_hg_dump(dump_name, target_path)
88 86 return target_path
89 87
90 88
91 89 def extract_svn_repo_from_dump(dump_name, repo_name):
92 90 """Create a svn repo `repo_name` from dump `dump_name`."""
93 91 repos_path = ScmModel().repos_path
94 92 target_path = os.path.join(repos_path, repo_name)
95 93 SubversionRepository(target_path, create=True)
96 94 _load_svn_dump_into_repo(dump_name, target_path)
97 95 return target_path
98 96
99 97
100 98 def assert_message_in_log(log_records, message, levelno, module):
101 99 messages = [
102 100 r.message for r in log_records
103 101 if r.module == module and r.levelno == levelno
104 102 ]
105 103 assert message in messages
106 104
107 105
108 106 def _load_svn_dump_into_repo(dump_name, repo_path):
109 107 """
110 108 Utility to populate a svn repository with a named dump
111 109
112 110 Currently the dumps are in rc_testdata. They might later on be
113 111 integrated with the main repository once they stabilize more.
114 112 """
115 113 dump = rc_testdata.load_svn_dump(dump_name)
116 114 load_dump = subprocess32.Popen(
117 115 ['svnadmin', 'load', repo_path],
118 116 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
119 117 stderr=subprocess32.PIPE)
120 118 out, err = load_dump.communicate(dump)
121 119 if load_dump.returncode != 0:
122 120 log.error("Output of load_dump command: %s", out)
123 121 log.error("Error output of load_dump command: %s", err)
124 122 raise Exception(
125 123 'Failed to load dump "%s" into repository at path "%s".'
126 124 % (dump_name, repo_path))
127 125
128 126
129 127 class AssertResponse(object):
130 128 """
131 129 Utility that helps to assert things about a given HTML response.
132 130 """
133 131
134 132 def __init__(self, response):
135 133 self.response = response
136 134
135 def get_imports(self):
136 from lxml.html import fromstring, tostring
137 from lxml.cssselect import CSSSelector
138 return fromstring, tostring, CSSSelector
139
137 140 def one_element_exists(self, css_selector):
138 141 self.get_element(css_selector)
139 142
140 143 def no_element_exists(self, css_selector):
141 144 assert not self._get_elements(css_selector)
142 145
143 146 def element_equals_to(self, css_selector, expected_content):
144 147 element = self.get_element(css_selector)
145 148 element_text = self._element_to_string(element)
146 149 assert expected_content in element_text
147 150
148 151 def element_contains(self, css_selector, expected_content):
149 152 element = self.get_element(css_selector)
150 153 assert expected_content in element.text_content()
151 154
152 155 def element_value_contains(self, css_selector, expected_content):
153 156 element = self.get_element(css_selector)
154 157 assert expected_content in element.value
155 158
156 159 def contains_one_link(self, link_text, href):
160 fromstring, tostring, CSSSelector = self.get_imports()
157 161 doc = fromstring(self.response.body)
158 162 sel = CSSSelector('a[href]')
159 163 elements = [
160 164 e for e in sel(doc) if e.text_content().strip() == link_text]
161 165 assert len(elements) == 1, "Did not find link or found multiple links"
162 166 self._ensure_url_equal(elements[0].attrib.get('href'), href)
163 167
164 168 def contains_one_anchor(self, anchor_id):
169 fromstring, tostring, CSSSelector = self.get_imports()
165 170 doc = fromstring(self.response.body)
166 171 sel = CSSSelector('#' + anchor_id)
167 172 elements = sel(doc)
168 173 assert len(elements) == 1
169 174
170 175 def _ensure_url_equal(self, found, expected):
171 176 assert _Url(found) == _Url(expected)
172 177
173 178 def get_element(self, css_selector):
174 179 elements = self._get_elements(css_selector)
175 180 assert len(elements) == 1
176 181 return elements[0]
177 182
178 183 def get_elements(self, css_selector):
179 184 return self._get_elements(css_selector)
180 185
181 186 def _get_elements(self, css_selector):
187 fromstring, tostring, CSSSelector = self.get_imports()
182 188 doc = fromstring(self.response.body)
183 189 sel = CSSSelector(css_selector)
184 190 elements = sel(doc)
185 191 return elements
186 192
187 193 def _element_to_string(self, element):
194 fromstring, tostring, CSSSelector = self.get_imports()
188 195 return tostring(element)
189 196
190 197
191 198 class _Url(object):
192 199 """
193 200 A url object that can be compared with other url orbjects
194 201 without regard to the vagaries of encoding, escaping, and ordering
195 202 of parameters in query strings.
196 203
197 204 Inspired by
198 205 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
199 206 """
200 207
201 208 def __init__(self, url):
202 209 parts = urlparse(url)
203 210 _query = frozenset(parse_qsl(parts.query))
204 211 _path = unquote_plus(parts.path)
205 212 parts = parts._replace(query=_query, path=_path)
206 213 self.parts = parts
207 214
208 215 def __eq__(self, other):
209 216 return self.parts == other.parts
210 217
211 218 def __hash__(self):
212 219 return hash(self.parts)
213 220
214 221
215 222 def run_test_concurrently(times, raise_catched_exc=True):
216 223 """
217 224 Add this decorator to small pieces of code that you want to test
218 225 concurrently
219 226
220 227 ex:
221 228
222 229 @test_concurrently(25)
223 230 def my_test_function():
224 231 ...
225 232 """
226 233 def test_concurrently_decorator(test_func):
227 234 def wrapper(*args, **kwargs):
228 235 exceptions = []
229 236
230 237 def call_test_func():
231 238 try:
232 239 test_func(*args, **kwargs)
233 except Exception, e:
240 except Exception as e:
234 241 exceptions.append(e)
235 242 if raise_catched_exc:
236 243 raise
237 244 threads = []
238 245 for i in range(times):
239 246 threads.append(threading.Thread(target=call_test_func))
240 247 for t in threads:
241 248 t.start()
242 249 for t in threads:
243 250 t.join()
244 251 if exceptions:
245 252 raise Exception(
246 253 'test_concurrently intercepted %s exceptions: %s' % (
247 254 len(exceptions), exceptions))
248 255 return wrapper
249 256 return test_concurrently_decorator
250 257
251 258
252 259 def wait_for_url(url, timeout=10):
253 260 """
254 261 Wait until URL becomes reachable.
255 262
256 263 It polls the URL until the timeout is reached or it became reachable.
257 264 If will call to `py.test.fail` in case the URL is not reachable.
258 265 """
259 266 timeout = time.time() + timeout
260 267 last = 0
261 268 wait = 0.1
262 269
263 while (timeout > last):
270 while timeout > last:
264 271 last = time.time()
265 272 if is_url_reachable(url):
266 273 break
267 elif ((last + wait) > time.time()):
274 elif (last + wait) > time.time():
268 275 # Go to sleep because not enough time has passed since last check.
269 276 time.sleep(wait)
270 277 else:
271 278 pytest.fail("Timeout while waiting for URL {}".format(url))
272 279
273 280
274 281 def is_url_reachable(url):
275 282 try:
276 283 urllib2.urlopen(url)
277 284 except urllib2.URLError:
278 285 return False
279 286 return True
280 287
281 288
282 289 def get_session_from_response(response):
283 290 """
284 291 This returns the session from a response object. Pylons has some magic
285 292 to make the session available as `response.session`. But pyramid
286 293 doesn't expose it.
287 294 """
288 295 # TODO: Try to look up the session key also.
289 296 return response.request.environ['beaker.session']
290 297
291 298
292 299 def repo_on_filesystem(repo_name):
293 300 from rhodecode.lib import vcs
294 301 from rhodecode.tests import TESTS_TMP_PATH
295 302 repo = vcs.get_vcs_instance(
296 303 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
297 304 return repo is not None
General Comments 0
You need to be logged in to leave comments. Login now