##// END OF EJS Templates
tests: added more verbose message about url reach error
marcink -
r3808:aa7b4a0e stable
parent child Browse files
Show More
@@ -1,468 +1,468 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import tempfile
27 27 import urllib2
28 28 from lxml.html import fromstring, tostring
29 29 from lxml.cssselect import CSSSelector
30 30 from urlparse import urlparse, parse_qsl
31 31 from urllib import unquote_plus
32 32 import webob
33 33
34 34 from webtest.app import TestResponse, TestApp, string_types
35 35 from webtest.compat import print_stderr
36 36
37 37 import pytest
38 38 import rc_testdata
39 39
40 40 from rhodecode.model.db import User, Repository
41 41 from rhodecode.model.meta import Session
42 42 from rhodecode.model.scm import ScmModel
43 43 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
44 44 from rhodecode.lib.vcs.backends.base import EmptyCommit
45 45 from rhodecode.tests import login_user_session
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49
50 50 class CustomTestResponse(TestResponse):
51 51
52 52 def _save_output(self, out):
53 53 f = tempfile.NamedTemporaryFile(delete=False, prefix='rc-test-', suffix='.html')
54 54 f.write(out)
55 55 return f.name
56 56
57 57 def mustcontain(self, *strings, **kw):
58 58 """
59 59 Assert that the response contains all of the strings passed
60 60 in as arguments.
61 61
62 62 Equivalent to::
63 63
64 64 assert string in res
65 65 """
66 66 print_body = kw.pop('print_body', False)
67 67 if 'no' in kw:
68 68 no = kw['no']
69 69 del kw['no']
70 70 if isinstance(no, string_types):
71 71 no = [no]
72 72 else:
73 73 no = []
74 74 if kw:
75 75 raise TypeError(
76 76 "The only keyword argument allowed is 'no' got %s" % kw)
77 77
78 78 f = self._save_output(str(self))
79 79
80 80 for s in strings:
81 81 if not s in self:
82 82 print_stderr("Actual response (no %r):" % s)
83 83 print_stderr("body output saved as `%s`" % f)
84 84 if print_body:
85 85 print_stderr(str(self))
86 86 raise IndexError(
87 87 "Body does not contain string %r, body output saved as %s" % (s, f))
88 88
89 89 for no_s in no:
90 90 if no_s in self:
91 91 print_stderr("Actual response (has %r)" % no_s)
92 92 print_stderr("body output saved as `%s`" % f)
93 93 if print_body:
94 94 print_stderr(str(self))
95 95 raise IndexError(
96 96 "Body contains bad string %r, body output saved as %s" % (no_s, f))
97 97
98 98 def assert_response(self):
99 99 return AssertResponse(self)
100 100
101 101 def get_session_from_response(self):
102 102 """
103 103 This returns the session from a response object.
104 104 """
105 105 from rhodecode.lib.rc_beaker import session_factory_from_settings
106 106 session = session_factory_from_settings(self.test_app._pyramid_settings)
107 107 return session(self.request)
108 108
109 109
110 110 class TestRequest(webob.BaseRequest):
111 111
112 112 # for py.test
113 113 disabled = True
114 114 ResponseClass = CustomTestResponse
115 115
116 116 def add_response_callback(self, callback):
117 117 pass
118 118
119 119
120 120 class CustomTestApp(TestApp):
121 121 """
122 122 Custom app to make mustcontain more Useful, and extract special methods
123 123 """
124 124 RequestClass = TestRequest
125 125 rc_login_data = {}
126 126 rc_current_session = None
127 127
128 128 def login(self, username=None, password=None):
129 129 from rhodecode.lib import auth
130 130
131 131 if username and password:
132 132 session = login_user_session(self, username, password)
133 133 else:
134 134 session = login_user_session(self)
135 135
136 136 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
137 137 self.rc_current_session = session
138 138 return session['rhodecode_user']
139 139
140 140 @property
141 141 def csrf_token(self):
142 142 return self.rc_login_data['csrf_token']
143 143
144 144 @property
145 145 def _pyramid_registry(self):
146 146 return self.app.config.registry
147 147
148 148 @property
149 149 def _pyramid_settings(self):
150 150 return self._pyramid_registry.settings
151 151
152 152
153 153 def set_anonymous_access(enabled):
154 154 """(Dis)allows anonymous access depending on parameter `enabled`"""
155 155 user = User.get_default_user()
156 156 user.active = enabled
157 157 Session().add(user)
158 158 Session().commit()
159 159 time.sleep(1.5) # must sleep for cache (1s to expire)
160 160 log.info('anonymous access is now: %s', enabled)
161 161 assert enabled == User.get_default_user().active, (
162 162 'Cannot set anonymous access')
163 163
164 164
165 165 def check_xfail_backends(node, backend_alias):
166 166 # Using "xfail_backends" here intentionally, since this marks work
167 167 # which is "to be done" soon.
168 168 skip_marker = node.get_closest_marker('xfail_backends')
169 169 if skip_marker and backend_alias in skip_marker.args:
170 170 msg = "Support for backend %s to be developed." % (backend_alias, )
171 171 msg = skip_marker.kwargs.get('reason', msg)
172 172 pytest.xfail(msg)
173 173
174 174
175 175 def check_skip_backends(node, backend_alias):
176 176 # Using "skip_backends" here intentionally, since this marks work which is
177 177 # not supported.
178 178 skip_marker = node.get_closest_marker('skip_backends')
179 179 if skip_marker and backend_alias in skip_marker.args:
180 180 msg = "Feature not supported for backend %s." % (backend_alias, )
181 181 msg = skip_marker.kwargs.get('reason', msg)
182 182 pytest.skip(msg)
183 183
184 184
185 185 def extract_git_repo_from_dump(dump_name, repo_name):
186 186 """Create git repo `repo_name` from dump `dump_name`."""
187 187 repos_path = ScmModel().repos_path
188 188 target_path = os.path.join(repos_path, repo_name)
189 189 rc_testdata.extract_git_dump(dump_name, target_path)
190 190 return target_path
191 191
192 192
193 193 def extract_hg_repo_from_dump(dump_name, repo_name):
194 194 """Create hg repo `repo_name` from dump `dump_name`."""
195 195 repos_path = ScmModel().repos_path
196 196 target_path = os.path.join(repos_path, repo_name)
197 197 rc_testdata.extract_hg_dump(dump_name, target_path)
198 198 return target_path
199 199
200 200
201 201 def extract_svn_repo_from_dump(dump_name, repo_name):
202 202 """Create a svn repo `repo_name` from dump `dump_name`."""
203 203 repos_path = ScmModel().repos_path
204 204 target_path = os.path.join(repos_path, repo_name)
205 205 SubversionRepository(target_path, create=True)
206 206 _load_svn_dump_into_repo(dump_name, target_path)
207 207 return target_path
208 208
209 209
210 210 def assert_message_in_log(log_records, message, levelno, module):
211 211 messages = [
212 212 r.message for r in log_records
213 213 if r.module == module and r.levelno == levelno
214 214 ]
215 215 assert message in messages
216 216
217 217
218 218 def _load_svn_dump_into_repo(dump_name, repo_path):
219 219 """
220 220 Utility to populate a svn repository with a named dump
221 221
222 222 Currently the dumps are in rc_testdata. They might later on be
223 223 integrated with the main repository once they stabilize more.
224 224 """
225 225 dump = rc_testdata.load_svn_dump(dump_name)
226 226 load_dump = subprocess32.Popen(
227 227 ['svnadmin', 'load', repo_path],
228 228 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
229 229 stderr=subprocess32.PIPE)
230 230 out, err = load_dump.communicate(dump)
231 231 if load_dump.returncode != 0:
232 232 log.error("Output of load_dump command: %s", out)
233 233 log.error("Error output of load_dump command: %s", err)
234 234 raise Exception(
235 235 'Failed to load dump "%s" into repository at path "%s".'
236 236 % (dump_name, repo_path))
237 237
238 238
239 239 class AssertResponse(object):
240 240 """
241 241 Utility that helps to assert things about a given HTML response.
242 242 """
243 243
244 244 def __init__(self, response):
245 245 self.response = response
246 246
247 247 def get_imports(self):
248 248 return fromstring, tostring, CSSSelector
249 249
250 250 def one_element_exists(self, css_selector):
251 251 self.get_element(css_selector)
252 252
253 253 def no_element_exists(self, css_selector):
254 254 assert not self._get_elements(css_selector)
255 255
256 256 def element_equals_to(self, css_selector, expected_content):
257 257 element = self.get_element(css_selector)
258 258 element_text = self._element_to_string(element)
259 259 assert expected_content in element_text
260 260
261 261 def element_contains(self, css_selector, expected_content):
262 262 element = self.get_element(css_selector)
263 263 assert expected_content in element.text_content()
264 264
265 265 def element_value_contains(self, css_selector, expected_content):
266 266 element = self.get_element(css_selector)
267 267 assert expected_content in element.value
268 268
269 269 def contains_one_link(self, link_text, href):
270 270 fromstring, tostring, CSSSelector = self.get_imports()
271 271 doc = fromstring(self.response.body)
272 272 sel = CSSSelector('a[href]')
273 273 elements = [
274 274 e for e in sel(doc) if e.text_content().strip() == link_text]
275 275 assert len(elements) == 1, "Did not find link or found multiple links"
276 276 self._ensure_url_equal(elements[0].attrib.get('href'), href)
277 277
278 278 def contains_one_anchor(self, anchor_id):
279 279 fromstring, tostring, CSSSelector = self.get_imports()
280 280 doc = fromstring(self.response.body)
281 281 sel = CSSSelector('#' + anchor_id)
282 282 elements = sel(doc)
283 283 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
284 284
285 285 def _ensure_url_equal(self, found, expected):
286 286 assert _Url(found) == _Url(expected)
287 287
288 288 def get_element(self, css_selector):
289 289 elements = self._get_elements(css_selector)
290 290 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
291 291 return elements[0]
292 292
293 293 def get_elements(self, css_selector):
294 294 return self._get_elements(css_selector)
295 295
296 296 def _get_elements(self, css_selector):
297 297 fromstring, tostring, CSSSelector = self.get_imports()
298 298 doc = fromstring(self.response.body)
299 299 sel = CSSSelector(css_selector)
300 300 elements = sel(doc)
301 301 return elements
302 302
303 303 def _element_to_string(self, element):
304 304 fromstring, tostring, CSSSelector = self.get_imports()
305 305 return tostring(element)
306 306
307 307
308 308 class _Url(object):
309 309 """
310 310 A url object that can be compared with other url orbjects
311 311 without regard to the vagaries of encoding, escaping, and ordering
312 312 of parameters in query strings.
313 313
314 314 Inspired by
315 315 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
316 316 """
317 317
318 318 def __init__(self, url):
319 319 parts = urlparse(url)
320 320 _query = frozenset(parse_qsl(parts.query))
321 321 _path = unquote_plus(parts.path)
322 322 parts = parts._replace(query=_query, path=_path)
323 323 self.parts = parts
324 324
325 325 def __eq__(self, other):
326 326 return self.parts == other.parts
327 327
328 328 def __hash__(self):
329 329 return hash(self.parts)
330 330
331 331
332 332 def run_test_concurrently(times, raise_catched_exc=True):
333 333 """
334 334 Add this decorator to small pieces of code that you want to test
335 335 concurrently
336 336
337 337 ex:
338 338
339 339 @test_concurrently(25)
340 340 def my_test_function():
341 341 ...
342 342 """
343 343 def test_concurrently_decorator(test_func):
344 344 def wrapper(*args, **kwargs):
345 345 exceptions = []
346 346
347 347 def call_test_func():
348 348 try:
349 349 test_func(*args, **kwargs)
350 350 except Exception as e:
351 351 exceptions.append(e)
352 352 if raise_catched_exc:
353 353 raise
354 354 threads = []
355 355 for i in range(times):
356 356 threads.append(threading.Thread(target=call_test_func))
357 357 for t in threads:
358 358 t.start()
359 359 for t in threads:
360 360 t.join()
361 361 if exceptions:
362 362 raise Exception(
363 363 'test_concurrently intercepted %s exceptions: %s' % (
364 364 len(exceptions), exceptions))
365 365 return wrapper
366 366 return test_concurrently_decorator
367 367
368 368
369 369 def wait_for_url(url, timeout=10):
370 370 """
371 371 Wait until URL becomes reachable.
372 372
373 373 It polls the URL until the timeout is reached or it became reachable.
374 374 If will call to `py.test.fail` in case the URL is not reachable.
375 375 """
376 376 timeout = time.time() + timeout
377 377 last = 0
378 378 wait = 0.1
379 379
380 380 while timeout > last:
381 381 last = time.time()
382 382 if is_url_reachable(url):
383 383 break
384 384 elif (last + wait) > time.time():
385 385 # Go to sleep because not enough time has passed since last check.
386 386 time.sleep(wait)
387 387 else:
388 388 pytest.fail("Timeout while waiting for URL {}".format(url))
389 389
390 390
391 391 def is_url_reachable(url):
392 392 try:
393 393 urllib2.urlopen(url)
394 394 except urllib2.URLError:
395 log.exception('URL Reach error')
395 log.exception('URL `{}` reach error'.format(url))
396 396 return False
397 397 return True
398 398
399 399
400 400 def repo_on_filesystem(repo_name):
401 401 from rhodecode.lib import vcs
402 402 from rhodecode.tests import TESTS_TMP_PATH
403 403 repo = vcs.get_vcs_instance(
404 404 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
405 405 return repo is not None
406 406
407 407
408 408 def commit_change(
409 409 repo, filename, content, message, vcs_type, parent=None, newfile=False):
410 410 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
411 411
412 412 repo = Repository.get_by_repo_name(repo)
413 413 _commit = parent
414 414 if not parent:
415 415 _commit = EmptyCommit(alias=vcs_type)
416 416
417 417 if newfile:
418 418 nodes = {
419 419 filename: {
420 420 'content': content
421 421 }
422 422 }
423 423 commit = ScmModel().create_nodes(
424 424 user=TEST_USER_ADMIN_LOGIN, repo=repo,
425 425 message=message,
426 426 nodes=nodes,
427 427 parent_commit=_commit,
428 428 author=TEST_USER_ADMIN_LOGIN,
429 429 )
430 430 else:
431 431 commit = ScmModel().commit_change(
432 432 repo=repo.scm_instance(), repo_name=repo.repo_name,
433 433 commit=parent, user=TEST_USER_ADMIN_LOGIN,
434 434 author=TEST_USER_ADMIN_LOGIN,
435 435 message=message,
436 436 content=content,
437 437 f_path=filename
438 438 )
439 439 return commit
440 440
441 441
442 442 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
443 443 if not default:
444 444 raise ValueError('Permission for default user must be given')
445 445 form_data = [(
446 446 'csrf_token', csrf_token
447 447 )]
448 448 # add default
449 449 form_data.extend([
450 450 ('u_perm_1', default)
451 451 ])
452 452
453 453 if grant:
454 454 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
455 455 form_data.extend([
456 456 ('perm_new_member_perm_new{}'.format(cnt), perm),
457 457 ('perm_new_member_id_new{}'.format(cnt), obj_id),
458 458 ('perm_new_member_name_new{}'.format(cnt), obj_name),
459 459 ('perm_new_member_type_new{}'.format(cnt), obj_type),
460 460
461 461 ])
462 462 if revoke:
463 463 for obj_id, obj_type in revoke:
464 464 form_data.extend([
465 465 ('perm_del_member_id_{}'.format(obj_id), obj_id),
466 466 ('perm_del_member_type_{}'.format(obj_id), obj_type),
467 467 ])
468 468 return form_data
General Comments 0
You need to be logged in to leave comments. Login now