##// END OF EJS Templates
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
super-admin -
r5190:9f610147 default
parent child Browse files
Show More
@@ -1,486 +1,493 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import threading
20 20 import time
21 import sys
21 22 import logging
22 23 import os.path
23 24 import subprocess
24 25 import tempfile
25 26 import urllib.request
26 27 import urllib.error
27 28 import urllib.parse
28 29 from lxml.html import fromstring, tostring
29 30 from lxml.cssselect import CSSSelector
30 31 from urllib.parse import unquote_plus
31 32 import webob
32 33
33 34 from webtest.app import TestResponse, TestApp
34 from webtest.compat import print_stderr
35
35 36
36 37 import pytest
37 38
38 39 try:
39 40 import rc_testdata
40 41 except ImportError:
41 42 raise ImportError('Failed to import rc_testdata, '
42 43 'please make sure this package is installed from requirements_test.txt')
43 44
44 45 from rhodecode.model.db import User, Repository
45 46 from rhodecode.model.meta import Session
46 47 from rhodecode.model.scm import ScmModel
47 48 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
48 49 from rhodecode.lib.vcs.backends.base import EmptyCommit
49 50 from rhodecode.tests import login_user_session
50 51
51 52 log = logging.getLogger(__name__)
52 53
53 54
55 def print_to_func(value, print_to=sys.stderr):
56 print(value, file=print_to)
57
58
54 59 class CustomTestResponse(TestResponse):
55 60
56 61 def _save_output(self, out):
57 62 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
58 63 f.write(out)
59 64 return f.name
60 65
61 66 def mustcontain(self, *strings, **kw):
62 67 """
63 Assert that the response contains all of the strings passed
68 Assert that the response contains all the strings passed
64 69 in as arguments.
65 70
66 71 Equivalent to::
67 72
68 73 assert string in res
69 74 """
70 75 print_body = kw.pop('print_body', False)
76 print_to = kw.pop('print_to', sys.stderr)
77
71 78 if 'no' in kw:
72 79 no = kw['no']
73 80 del kw['no']
74 81 if isinstance(no, str):
75 82 no = [no]
76 83 else:
77 84 no = []
78 85 if kw:
79 86 raise TypeError(f"The only keyword argument allowed is 'no' got {kw}")
80 87
81 88 f = self._save_output(str(self))
82 89
83 90 for s in strings:
84 91 if s not in self:
85 print_stderr(f"Actual response (no {s!r}):")
86 print_stderr(f"body output saved as `{f}`")
92 print_to_func(f"Actual response (no {s!r}):", print_to=print_to)
93 print_to_func(f"body output saved as `{f}`", print_to=print_to)
87 94 if print_body:
88 print_stderr(str(self))
95 print_to_func(str(self), print_to=print_to)
89 96 raise IndexError(f"Body does not contain string {s!r}, body output saved as {f}")
90 97
91 98 for no_s in no:
92 99 if no_s in self:
93 print_stderr(f"Actual response (has {no_s!r})")
94 print_stderr(f"body output saved as `{f}`")
100 print_to_func(f"Actual response (has {no_s!r})", print_to=print_to)
101 print_to_func(f"body output saved as `{f}`", print_to=print_to)
95 102 if print_body:
96 print_stderr(str(self))
103 print_to_func(str(self), print_to=print_to)
97 104 raise IndexError(f"Body contains bad string {no_s!r}, body output saved as {f}")
98 105
99 106 def assert_response(self):
100 107 return AssertResponse(self)
101 108
102 109 def get_session_from_response(self):
103 110 """
104 111 This returns the session from a response object.
105 112 """
106 113 from rhodecode.lib.rc_beaker import session_factory_from_settings
107 114 session = session_factory_from_settings(self.test_app._pyramid_settings)
108 115 return session(self.request)
109 116
110 117
111 118 class TestRequest(webob.BaseRequest):
112 119
113 120 # for py.test, so it doesn't try to run this tas by name starting with test...
114 121 disabled = True
115 122 ResponseClass = CustomTestResponse
116 123
117 124 def add_response_callback(self, callback):
118 125 pass
119 126
120 127 @classmethod
121 128 def blank(cls, path, environ=None, base_url=None,
122 129 headers=None, POST=None, **kw):
123 130
124 131 if not path.isascii():
125 132 # our custom quote path if it contains non-ascii chars
126 133 path = urllib.parse.quote(path)
127 134
128 135 return super(TestRequest, cls).blank(
129 136 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
130 137
131 138
132 139 class CustomTestApp(TestApp):
133 140 """
134 141 Custom app to make mustcontain more Useful, and extract special methods
135 142 """
136 143 RequestClass = TestRequest
137 144 rc_login_data = {}
138 145 rc_current_session = None
139 146
140 147 def login(self, username=None, password=None):
141 148 from rhodecode.lib import auth
142 149
143 150 if username and password:
144 151 session = login_user_session(self, username, password)
145 152 else:
146 153 session = login_user_session(self)
147 154
148 155 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
149 156 self.rc_current_session = session
150 157 return session['rhodecode_user']
151 158
152 159 @property
153 160 def csrf_token(self):
154 161 return self.rc_login_data['csrf_token']
155 162
156 163 @property
157 164 def _pyramid_registry(self):
158 165 return self.app.config.registry
159 166
160 167 @property
161 168 def _pyramid_settings(self):
162 169 return self._pyramid_registry.settings
163 170
164 171 def do_request(self, req, status=None, expect_errors=None):
165 172 # you can put custom code here
166 173 return super().do_request(req, status, expect_errors)
167 174
168 175
169 176 def set_anonymous_access(enabled):
170 177 """(Dis)allows anonymous access depending on parameter `enabled`"""
171 178 user = User.get_default_user()
172 179 user.active = enabled
173 180 Session().add(user)
174 181 Session().commit()
175 182 time.sleep(1.5) # must sleep for cache (1s to expire)
176 183 log.info('anonymous access is now: %s', enabled)
177 184 assert enabled == User.get_default_user().active, (
178 185 'Cannot set anonymous access')
179 186
180 187
181 188 def check_xfail_backends(node, backend_alias):
182 189 # Using "xfail_backends" here intentionally, since this marks work
183 190 # which is "to be done" soon.
184 191 skip_marker = node.get_closest_marker('xfail_backends')
185 192 if skip_marker and backend_alias in skip_marker.args:
186 193 msg = "Support for backend %s to be developed." % (backend_alias, )
187 194 msg = skip_marker.kwargs.get('reason', msg)
188 195 pytest.xfail(msg)
189 196
190 197
191 198 def check_skip_backends(node, backend_alias):
192 199 # Using "skip_backends" here intentionally, since this marks work which is
193 200 # not supported.
194 201 skip_marker = node.get_closest_marker('skip_backends')
195 202 if skip_marker and backend_alias in skip_marker.args:
196 203 msg = "Feature not supported for backend %s." % (backend_alias, )
197 204 msg = skip_marker.kwargs.get('reason', msg)
198 205 pytest.skip(msg)
199 206
200 207
201 208 def extract_git_repo_from_dump(dump_name, repo_name):
202 209 """Create git repo `repo_name` from dump `dump_name`."""
203 210 repos_path = ScmModel().repos_path
204 211 target_path = os.path.join(repos_path, repo_name)
205 212 rc_testdata.extract_git_dump(dump_name, target_path)
206 213 return target_path
207 214
208 215
209 216 def extract_hg_repo_from_dump(dump_name, repo_name):
210 217 """Create hg repo `repo_name` from dump `dump_name`."""
211 218 repos_path = ScmModel().repos_path
212 219 target_path = os.path.join(repos_path, repo_name)
213 220 rc_testdata.extract_hg_dump(dump_name, target_path)
214 221 return target_path
215 222
216 223
217 224 def extract_svn_repo_from_dump(dump_name, repo_name):
218 225 """Create a svn repo `repo_name` from dump `dump_name`."""
219 226 repos_path = ScmModel().repos_path
220 227 target_path = os.path.join(repos_path, repo_name)
221 228 SubversionRepository(target_path, create=True)
222 229 _load_svn_dump_into_repo(dump_name, target_path)
223 230 return target_path
224 231
225 232
226 233 def assert_message_in_log(log_records, message, levelno, module):
227 234 messages = [
228 235 r.message for r in log_records
229 236 if r.module == module and r.levelno == levelno
230 237 ]
231 238 assert message in messages
232 239
233 240
234 241 def _load_svn_dump_into_repo(dump_name, repo_path):
235 242 """
236 243 Utility to populate a svn repository with a named dump
237 244
238 245 Currently the dumps are in rc_testdata. They might later on be
239 246 integrated with the main repository once they stabilize more.
240 247 """
241 248 dump = rc_testdata.load_svn_dump(dump_name)
242 249 load_dump = subprocess.Popen(
243 250 ['svnadmin', 'load', repo_path],
244 251 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
245 252 stderr=subprocess.PIPE)
246 253 out, err = load_dump.communicate(dump)
247 254 if load_dump.returncode != 0:
248 255 log.error("Output of load_dump command: %s", out)
249 256 log.error("Error output of load_dump command: %s", err)
250 257 raise Exception(
251 258 'Failed to load dump "%s" into repository at path "%s".'
252 259 % (dump_name, repo_path))
253 260
254 261
255 262 class AssertResponse(object):
256 263 """
257 264 Utility that helps to assert things about a given HTML response.
258 265 """
259 266
260 267 def __init__(self, response):
261 268 self.response = response
262 269
263 270 def get_imports(self):
264 271 return fromstring, tostring, CSSSelector
265 272
266 273 def one_element_exists(self, css_selector):
267 274 self.get_element(css_selector)
268 275
269 276 def no_element_exists(self, css_selector):
270 277 assert not self._get_elements(css_selector)
271 278
272 279 def element_equals_to(self, css_selector, expected_content):
273 280 element = self.get_element(css_selector)
274 281 element_text = self._element_to_string(element)
275 282
276 283 assert expected_content in element_text
277 284
278 285 def element_contains(self, css_selector, expected_content):
279 286 element = self.get_element(css_selector)
280 287 assert expected_content in element.text_content()
281 288
282 289 def element_value_contains(self, css_selector, expected_content):
283 290 element = self.get_element(css_selector)
284 291 assert expected_content in element.value
285 292
286 293 def contains_one_link(self, link_text, href):
287 294 fromstring, tostring, CSSSelector = self.get_imports()
288 295 doc = fromstring(self.response.body)
289 296 sel = CSSSelector('a[href]')
290 297 elements = [
291 298 e for e in sel(doc) if e.text_content().strip() == link_text]
292 299 assert len(elements) == 1, "Did not find link or found multiple links"
293 300 self._ensure_url_equal(elements[0].attrib.get('href'), href)
294 301
295 302 def contains_one_anchor(self, anchor_id):
296 303 fromstring, tostring, CSSSelector = self.get_imports()
297 304 doc = fromstring(self.response.body)
298 305 sel = CSSSelector('#' + anchor_id)
299 306 elements = sel(doc)
300 307 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
301 308
302 309 def _ensure_url_equal(self, found, expected):
303 310 assert _Url(found) == _Url(expected)
304 311
305 312 def get_element(self, css_selector):
306 313 elements = self._get_elements(css_selector)
307 314 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
308 315 return elements[0]
309 316
310 317 def get_elements(self, css_selector):
311 318 return self._get_elements(css_selector)
312 319
313 320 def _get_elements(self, css_selector):
314 321 fromstring, tostring, CSSSelector = self.get_imports()
315 322 doc = fromstring(self.response.body)
316 323 sel = CSSSelector(css_selector)
317 324 elements = sel(doc)
318 325 return elements
319 326
320 327 def _element_to_string(self, element):
321 328 fromstring, tostring, CSSSelector = self.get_imports()
322 329 return tostring(element, encoding='unicode')
323 330
324 331
325 332 class _Url(object):
326 333 """
327 334 A url object that can be compared with other url orbjects
328 335 without regard to the vagaries of encoding, escaping, and ordering
329 336 of parameters in query strings.
330 337
331 338 Inspired by
332 339 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
333 340 """
334 341
335 342 def __init__(self, url):
336 343 parts = urllib.parse.urlparse(url)
337 344 _query = frozenset(urllib.parse.parse_qsl(parts.query))
338 345 _path = unquote_plus(parts.path)
339 346 parts = parts._replace(query=_query, path=_path)
340 347 self.parts = parts
341 348
342 349 def __eq__(self, other):
343 350 return self.parts == other.parts
344 351
345 352 def __hash__(self):
346 353 return hash(self.parts)
347 354
348 355
349 356 def run_test_concurrently(times, raise_catched_exc=True):
350 357 """
351 358 Add this decorator to small pieces of code that you want to test
352 359 concurrently
353 360
354 361 ex:
355 362
356 363 @test_concurrently(25)
357 364 def my_test_function():
358 365 ...
359 366 """
360 367 def test_concurrently_decorator(test_func):
361 368 def wrapper(*args, **kwargs):
362 369 exceptions = []
363 370
364 371 def call_test_func():
365 372 try:
366 373 test_func(*args, **kwargs)
367 374 except Exception as e:
368 375 exceptions.append(e)
369 376 if raise_catched_exc:
370 377 raise
371 378 threads = []
372 379 for i in range(times):
373 380 threads.append(threading.Thread(target=call_test_func))
374 381 for t in threads:
375 382 t.start()
376 383 for t in threads:
377 384 t.join()
378 385 if exceptions:
379 386 raise Exception(
380 387 'test_concurrently intercepted %s exceptions: %s' % (
381 388 len(exceptions), exceptions))
382 389 return wrapper
383 390 return test_concurrently_decorator
384 391
385 392
386 393 def wait_for_url(url, timeout=10):
387 394 """
388 395 Wait until URL becomes reachable.
389 396
390 397 It polls the URL until the timeout is reached or it became reachable.
391 398 If will call to `py.test.fail` in case the URL is not reachable.
392 399 """
393 400 timeout = time.time() + timeout
394 401 last = 0
395 402 wait = 0.1
396 403
397 404 while timeout > last:
398 405 last = time.time()
399 406 if is_url_reachable(url, log_exc=False):
400 407 break
401 408 elif (last + wait) > time.time():
402 409 # Go to sleep because not enough time has passed since last check.
403 410 time.sleep(wait)
404 411 else:
405 412 pytest.fail(f"Timeout while waiting for URL {url}")
406 413
407 414
408 415 def is_url_reachable(url: str, log_exc: bool = False) -> bool:
409 416 try:
410 417 urllib.request.urlopen(url)
411 418 except urllib.error.URLError:
412 419 if log_exc:
413 420 log.exception(f'URL `{url}` reach error')
414 421 return False
415 422 return True
416 423
417 424
418 425 def repo_on_filesystem(repo_name):
419 426 from rhodecode.lib import vcs
420 427 from rhodecode.tests import TESTS_TMP_PATH
421 428 repo = vcs.get_vcs_instance(
422 429 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
423 430 return repo is not None
424 431
425 432
426 433 def commit_change(
427 434 repo, filename: bytes, content: bytes, message, vcs_type, parent=None, newfile=False):
428 435 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
429 436
430 437 repo = Repository.get_by_repo_name(repo)
431 438 _commit = parent
432 439 if not parent:
433 440 _commit = EmptyCommit(alias=vcs_type)
434 441
435 442 if newfile:
436 443 nodes = {
437 444 filename: {
438 445 'content': content
439 446 }
440 447 }
441 448 commit = ScmModel().create_nodes(
442 449 user=TEST_USER_ADMIN_LOGIN, repo=repo,
443 450 message=message,
444 451 nodes=nodes,
445 452 parent_commit=_commit,
446 453 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
447 454 )
448 455 else:
449 456 commit = ScmModel().commit_change(
450 457 repo=repo.scm_instance(), repo_name=repo.repo_name,
451 458 commit=parent, user=TEST_USER_ADMIN_LOGIN,
452 459 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
453 460 message=message,
454 461 content=content,
455 462 f_path=filename
456 463 )
457 464 return commit
458 465
459 466
460 467 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
461 468 if not default:
462 469 raise ValueError('Permission for default user must be given')
463 470 form_data = [(
464 471 'csrf_token', csrf_token
465 472 )]
466 473 # add default
467 474 form_data.extend([
468 475 ('u_perm_1', default)
469 476 ])
470 477
471 478 if grant:
472 479 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
473 480 form_data.extend([
474 481 ('perm_new_member_perm_new{}'.format(cnt), perm),
475 482 ('perm_new_member_id_new{}'.format(cnt), obj_id),
476 483 ('perm_new_member_name_new{}'.format(cnt), obj_name),
477 484 ('perm_new_member_type_new{}'.format(cnt), obj_type),
478 485
479 486 ])
480 487 if revoke:
481 488 for obj_id, obj_type in revoke:
482 489 form_data.extend([
483 490 ('perm_del_member_id_{}'.format(obj_id), obj_id),
484 491 ('perm_del_member_type_{}'.format(obj_id), obj_type),
485 492 ])
486 493 return form_data
General Comments 0
You need to be logged in to leave comments. Login now