##// END OF EJS Templates
tests: allow logging exceptions on url reach check
super-admin -
r5023:5d32bc46 default
parent child Browse files
Show More
@@ -1,486 +1,487 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess
26 26 import tempfile
27 27 import urllib.request
28 28 import urllib.error
29 29 import urllib.parse
30 30 from lxml.html import fromstring, tostring
31 31 from lxml.cssselect import CSSSelector
32 32 from urllib.parse import unquote_plus
33 33 import webob
34 34
35 35 from webtest.app import TestResponse, TestApp
36 36 from webtest.compat import print_stderr
37 37
38 38 import pytest
39 39
40 40 try:
41 41 import rc_testdata
42 42 except ImportError:
43 43 raise ImportError('Failed to import rc_testdata, '
44 44 'please make sure this package is installed from requirements_test.txt')
45 45
46 46 from rhodecode.model.db import User, Repository
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.scm import ScmModel
49 49 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
50 50 from rhodecode.lib.vcs.backends.base import EmptyCommit
51 51 from rhodecode.tests import login_user_session
52 52
53 53 log = logging.getLogger(__name__)
54 54
55 55
56 56 class CustomTestResponse(TestResponse):
57 57
58 58 def _save_output(self, out):
59 59 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
60 60 f.write(out)
61 61 return f.name
62 62
63 63 def mustcontain(self, *strings, **kw):
64 64 """
65 65 Assert that the response contains all of the strings passed
66 66 in as arguments.
67 67
68 68 Equivalent to::
69 69
70 70 assert string in res
71 71 """
72 72 print_body = kw.pop('print_body', False)
73 73 if 'no' in kw:
74 74 no = kw['no']
75 75 del kw['no']
76 76 if isinstance(no, str):
77 77 no = [no]
78 78 else:
79 79 no = []
80 80 if kw:
81 81 raise TypeError(
82 82 "The only keyword argument allowed is 'no' got %s" % kw)
83 83
84 84 f = self._save_output(str(self))
85 85
86 86 for s in strings:
87 87 if s not in self:
88 88 print_stderr("Actual response (no %r):" % s)
89 89 print_stderr("body output saved as `%s`" % f)
90 90 if print_body:
91 91 print_stderr(str(self))
92 92 raise IndexError(
93 93 "Body does not contain string %r, body output saved as %s" % (s, f))
94 94
95 95 for no_s in no:
96 96 if no_s in self:
97 97 print_stderr("Actual response (has %r)" % no_s)
98 98 print_stderr("body output saved as `%s`" % f)
99 99 if print_body:
100 100 print_stderr(str(self))
101 101 raise IndexError(
102 102 "Body contains bad string %r, body output saved as %s" % (no_s, f))
103 103
104 104 def assert_response(self):
105 105 return AssertResponse(self)
106 106
107 107 def get_session_from_response(self):
108 108 """
109 109 This returns the session from a response object.
110 110 """
111 111 from rhodecode.lib.rc_beaker import session_factory_from_settings
112 112 session = session_factory_from_settings(self.test_app._pyramid_settings)
113 113 return session(self.request)
114 114
115 115
116 116 class TestRequest(webob.BaseRequest):
117 117
118 118 # for py.test, so it doesn't try to run this tas by name starting with test...
119 119 disabled = True
120 120 ResponseClass = CustomTestResponse
121 121
122 122 def add_response_callback(self, callback):
123 123 pass
124 124
125 125 @classmethod
126 126 def blank(cls, path, environ=None, base_url=None,
127 127 headers=None, POST=None, **kw):
128 128
129 129 if not path.isascii():
130 130 # our custom quote path if it contains non-ascii chars
131 131 path = urllib.parse.quote(path)
132 132
133 133 return super(TestRequest, cls).blank(
134 134 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
135 135
136 136
137 137 class CustomTestApp(TestApp):
138 138 """
139 139 Custom app to make mustcontain more Useful, and extract special methods
140 140 """
141 141 RequestClass = TestRequest
142 142 rc_login_data = {}
143 143 rc_current_session = None
144 144
145 145 def login(self, username=None, password=None):
146 146 from rhodecode.lib import auth
147 147
148 148 if username and password:
149 149 session = login_user_session(self, username, password)
150 150 else:
151 151 session = login_user_session(self)
152 152
153 153 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
154 154 self.rc_current_session = session
155 155 return session['rhodecode_user']
156 156
157 157 @property
158 158 def csrf_token(self):
159 159 return self.rc_login_data['csrf_token']
160 160
161 161 @property
162 162 def _pyramid_registry(self):
163 163 return self.app.config.registry
164 164
165 165 @property
166 166 def _pyramid_settings(self):
167 167 return self._pyramid_registry.settings
168 168
169 169
170 170 def set_anonymous_access(enabled):
171 171 """(Dis)allows anonymous access depending on parameter `enabled`"""
172 172 user = User.get_default_user()
173 173 user.active = enabled
174 174 Session().add(user)
175 175 Session().commit()
176 176 time.sleep(1.5) # must sleep for cache (1s to expire)
177 177 log.info('anonymous access is now: %s', enabled)
178 178 assert enabled == User.get_default_user().active, (
179 179 'Cannot set anonymous access')
180 180
181 181
182 182 def check_xfail_backends(node, backend_alias):
183 183 # Using "xfail_backends" here intentionally, since this marks work
184 184 # which is "to be done" soon.
185 185 skip_marker = node.get_closest_marker('xfail_backends')
186 186 if skip_marker and backend_alias in skip_marker.args:
187 187 msg = "Support for backend %s to be developed." % (backend_alias, )
188 188 msg = skip_marker.kwargs.get('reason', msg)
189 189 pytest.xfail(msg)
190 190
191 191
192 192 def check_skip_backends(node, backend_alias):
193 193 # Using "skip_backends" here intentionally, since this marks work which is
194 194 # not supported.
195 195 skip_marker = node.get_closest_marker('skip_backends')
196 196 if skip_marker and backend_alias in skip_marker.args:
197 197 msg = "Feature not supported for backend %s." % (backend_alias, )
198 198 msg = skip_marker.kwargs.get('reason', msg)
199 199 pytest.skip(msg)
200 200
201 201
202 202 def extract_git_repo_from_dump(dump_name, repo_name):
203 203 """Create git repo `repo_name` from dump `dump_name`."""
204 204 repos_path = ScmModel().repos_path
205 205 target_path = os.path.join(repos_path, repo_name)
206 206 rc_testdata.extract_git_dump(dump_name, target_path)
207 207 return target_path
208 208
209 209
210 210 def extract_hg_repo_from_dump(dump_name, repo_name):
211 211 """Create hg repo `repo_name` from dump `dump_name`."""
212 212 repos_path = ScmModel().repos_path
213 213 target_path = os.path.join(repos_path, repo_name)
214 214 rc_testdata.extract_hg_dump(dump_name, target_path)
215 215 return target_path
216 216
217 217
218 218 def extract_svn_repo_from_dump(dump_name, repo_name):
219 219 """Create a svn repo `repo_name` from dump `dump_name`."""
220 220 repos_path = ScmModel().repos_path
221 221 target_path = os.path.join(repos_path, repo_name)
222 222 SubversionRepository(target_path, create=True)
223 223 _load_svn_dump_into_repo(dump_name, target_path)
224 224 return target_path
225 225
226 226
227 227 def assert_message_in_log(log_records, message, levelno, module):
228 228 messages = [
229 229 r.message for r in log_records
230 230 if r.module == module and r.levelno == levelno
231 231 ]
232 232 assert message in messages
233 233
234 234
235 235 def _load_svn_dump_into_repo(dump_name, repo_path):
236 236 """
237 237 Utility to populate a svn repository with a named dump
238 238
239 239 Currently the dumps are in rc_testdata. They might later on be
240 240 integrated with the main repository once they stabilize more.
241 241 """
242 242 dump = rc_testdata.load_svn_dump(dump_name)
243 243 load_dump = subprocess.Popen(
244 244 ['svnadmin', 'load', repo_path],
245 245 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
246 246 stderr=subprocess.PIPE)
247 247 out, err = load_dump.communicate(dump)
248 248 if load_dump.returncode != 0:
249 249 log.error("Output of load_dump command: %s", out)
250 250 log.error("Error output of load_dump command: %s", err)
251 251 raise Exception(
252 252 'Failed to load dump "%s" into repository at path "%s".'
253 253 % (dump_name, repo_path))
254 254
255 255
256 256 class AssertResponse(object):
257 257 """
258 258 Utility that helps to assert things about a given HTML response.
259 259 """
260 260
261 261 def __init__(self, response):
262 262 self.response = response
263 263
264 264 def get_imports(self):
265 265 return fromstring, tostring, CSSSelector
266 266
267 267 def one_element_exists(self, css_selector):
268 268 self.get_element(css_selector)
269 269
270 270 def no_element_exists(self, css_selector):
271 271 assert not self._get_elements(css_selector)
272 272
273 273 def element_equals_to(self, css_selector, expected_content):
274 274 element = self.get_element(css_selector)
275 275 element_text = self._element_to_string(element)
276 276
277 277 assert expected_content in element_text
278 278
279 279 def element_contains(self, css_selector, expected_content):
280 280 element = self.get_element(css_selector)
281 281 assert expected_content in element.text_content()
282 282
283 283 def element_value_contains(self, css_selector, expected_content):
284 284 element = self.get_element(css_selector)
285 285 assert expected_content in element.value
286 286
287 287 def contains_one_link(self, link_text, href):
288 288 fromstring, tostring, CSSSelector = self.get_imports()
289 289 doc = fromstring(self.response.body)
290 290 sel = CSSSelector('a[href]')
291 291 elements = [
292 292 e for e in sel(doc) if e.text_content().strip() == link_text]
293 293 assert len(elements) == 1, "Did not find link or found multiple links"
294 294 self._ensure_url_equal(elements[0].attrib.get('href'), href)
295 295
296 296 def contains_one_anchor(self, anchor_id):
297 297 fromstring, tostring, CSSSelector = self.get_imports()
298 298 doc = fromstring(self.response.body)
299 299 sel = CSSSelector('#' + anchor_id)
300 300 elements = sel(doc)
301 301 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
302 302
303 303 def _ensure_url_equal(self, found, expected):
304 304 assert _Url(found) == _Url(expected)
305 305
306 306 def get_element(self, css_selector):
307 307 elements = self._get_elements(css_selector)
308 308 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
309 309 return elements[0]
310 310
311 311 def get_elements(self, css_selector):
312 312 return self._get_elements(css_selector)
313 313
314 314 def _get_elements(self, css_selector):
315 315 fromstring, tostring, CSSSelector = self.get_imports()
316 316 doc = fromstring(self.response.body)
317 317 sel = CSSSelector(css_selector)
318 318 elements = sel(doc)
319 319 return elements
320 320
321 321 def _element_to_string(self, element):
322 322 fromstring, tostring, CSSSelector = self.get_imports()
323 323 return tostring(element, encoding='unicode')
324 324
325 325
326 326 class _Url(object):
327 327 """
328 328 A url object that can be compared with other url orbjects
329 329 without regard to the vagaries of encoding, escaping, and ordering
330 330 of parameters in query strings.
331 331
332 332 Inspired by
333 333 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
334 334 """
335 335
336 336 def __init__(self, url):
337 337 parts = urllib.parse.urlparse(url)
338 338 _query = frozenset(urllib.parse.parse_qsl(parts.query))
339 339 _path = unquote_plus(parts.path)
340 340 parts = parts._replace(query=_query, path=_path)
341 341 self.parts = parts
342 342
343 343 def __eq__(self, other):
344 344 return self.parts == other.parts
345 345
346 346 def __hash__(self):
347 347 return hash(self.parts)
348 348
349 349
350 350 def run_test_concurrently(times, raise_catched_exc=True):
351 351 """
352 352 Add this decorator to small pieces of code that you want to test
353 353 concurrently
354 354
355 355 ex:
356 356
357 357 @test_concurrently(25)
358 358 def my_test_function():
359 359 ...
360 360 """
361 361 def test_concurrently_decorator(test_func):
362 362 def wrapper(*args, **kwargs):
363 363 exceptions = []
364 364
365 365 def call_test_func():
366 366 try:
367 367 test_func(*args, **kwargs)
368 368 except Exception as e:
369 369 exceptions.append(e)
370 370 if raise_catched_exc:
371 371 raise
372 372 threads = []
373 373 for i in range(times):
374 374 threads.append(threading.Thread(target=call_test_func))
375 375 for t in threads:
376 376 t.start()
377 377 for t in threads:
378 378 t.join()
379 379 if exceptions:
380 380 raise Exception(
381 381 'test_concurrently intercepted %s exceptions: %s' % (
382 382 len(exceptions), exceptions))
383 383 return wrapper
384 384 return test_concurrently_decorator
385 385
386 386
387 387 def wait_for_url(url, timeout=10):
388 388 """
389 389 Wait until URL becomes reachable.
390 390
391 391 It polls the URL until the timeout is reached or it became reachable.
392 392 If will call to `py.test.fail` in case the URL is not reachable.
393 393 """
394 394 timeout = time.time() + timeout
395 395 last = 0
396 396 wait = 0.1
397 397
398 398 while timeout > last:
399 399 last = time.time()
400 if is_url_reachable(url):
400 if is_url_reachable(url, log_exc=False):
401 401 break
402 402 elif (last + wait) > time.time():
403 403 # Go to sleep because not enough time has passed since last check.
404 404 time.sleep(wait)
405 405 else:
406 pytest.fail("Timeout while waiting for URL {}".format(url))
406 pytest.fail(f"Timeout while waiting for URL {url}")
407 407
408 408
409 def is_url_reachable(url):
409 def is_url_reachable(url: str, log_exc: bool = True) -> bool:
410 410 try:
411 411 urllib.request.urlopen(url)
412 412 except urllib.error.URLError:
413 log.exception('URL `{}` reach error'.format(url))
413 if log_exc:
414 log.exception('URL `{}` reach error'.format(url))
414 415 return False
415 416 return True
416 417
417 418
418 419 def repo_on_filesystem(repo_name):
419 420 from rhodecode.lib import vcs
420 421 from rhodecode.tests import TESTS_TMP_PATH
421 422 repo = vcs.get_vcs_instance(
422 423 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
423 424 return repo is not None
424 425
425 426
426 427 def commit_change(
427 428 repo, filename, content, message, vcs_type, parent=None, newfile=False):
428 429 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
429 430
430 431 repo = Repository.get_by_repo_name(repo)
431 432 _commit = parent
432 433 if not parent:
433 434 _commit = EmptyCommit(alias=vcs_type)
434 435
435 436 if newfile:
436 437 nodes = {
437 438 filename: {
438 439 'content': content
439 440 }
440 441 }
441 442 commit = ScmModel().create_nodes(
442 443 user=TEST_USER_ADMIN_LOGIN, repo=repo,
443 444 message=message,
444 445 nodes=nodes,
445 446 parent_commit=_commit,
446 447 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
447 448 )
448 449 else:
449 450 commit = ScmModel().commit_change(
450 451 repo=repo.scm_instance(), repo_name=repo.repo_name,
451 452 commit=parent, user=TEST_USER_ADMIN_LOGIN,
452 453 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
453 454 message=message,
454 455 content=content,
455 456 f_path=filename
456 457 )
457 458 return commit
458 459
459 460
460 461 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
461 462 if not default:
462 463 raise ValueError('Permission for default user must be given')
463 464 form_data = [(
464 465 'csrf_token', csrf_token
465 466 )]
466 467 # add default
467 468 form_data.extend([
468 469 ('u_perm_1', default)
469 470 ])
470 471
471 472 if grant:
472 473 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
473 474 form_data.extend([
474 475 ('perm_new_member_perm_new{}'.format(cnt), perm),
475 476 ('perm_new_member_id_new{}'.format(cnt), obj_id),
476 477 ('perm_new_member_name_new{}'.format(cnt), obj_name),
477 478 ('perm_new_member_type_new{}'.format(cnt), obj_type),
478 479
479 480 ])
480 481 if revoke:
481 482 for obj_id, obj_type in revoke:
482 483 form_data.extend([
483 484 ('perm_del_member_id_{}'.format(obj_id), obj_id),
484 485 ('perm_del_member_type_{}'.format(obj_id), obj_type),
485 486 ])
486 487 return form_data
General Comments 0
You need to be logged in to leave comments. Login now