##// END OF EJS Templates
tests: fixed pyramid_beaker import
marcink -
r3765:8fd3d071 new-ui
parent child Browse files
Show More
@@ -1,465 +1,464 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import tempfile
27 27 import urllib2
28 28 from lxml.html import fromstring, tostring
29 29 from lxml.cssselect import CSSSelector
30 30 from urlparse import urlparse, parse_qsl
31 31 from urllib import unquote_plus
32 32 import webob
33 33
34 34 from webtest.app import TestResponse, TestApp, string_types
35 35 from webtest.compat import print_stderr
36 36
37 37 import pytest
38 38 import rc_testdata
39 39
40 40 from rhodecode.model.db import User, Repository
41 41 from rhodecode.model.meta import Session
42 42 from rhodecode.model.scm import ScmModel
43 43 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
44 44 from rhodecode.lib.vcs.backends.base import EmptyCommit
45 45 from rhodecode.tests import login_user_session
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49
50 50 class CustomTestResponse(TestResponse):
51 51 def _save_output(self, out):
52 52 f = tempfile.NamedTemporaryFile(
53 53 delete=False, prefix='rc-test-', suffix='.html')
54 54 f.write(out)
55 55 return f.name
56 56
57 57 def mustcontain(self, *strings, **kw):
58 58 """
59 59 Assert that the response contains all of the strings passed
60 60 in as arguments.
61 61
62 62 Equivalent to::
63 63
64 64 assert string in res
65 65 """
66 66 if 'no' in kw:
67 67 no = kw['no']
68 68 del kw['no']
69 69 if isinstance(no, string_types):
70 70 no = [no]
71 71 else:
72 72 no = []
73 73 if kw:
74 74 raise TypeError(
75 75 "The only keyword argument allowed is 'no' got %s" % kw)
76 76
77 77 f = self._save_output(str(self))
78 78
79 79 for s in strings:
80 80 if not s in self:
81 81 print_stderr("Actual response (no %r):" % s)
82 82 print_stderr(str(self))
83 83 raise IndexError(
84 84 "Body does not contain string %r, output saved as %s" % (
85 85 s, f))
86 86
87 87 for no_s in no:
88 88 if no_s in self:
89 89 print_stderr("Actual response (has %r)" % no_s)
90 90 print_stderr(str(self))
91 91 raise IndexError(
92 92 "Body contains bad string %r, output saved as %s" % (
93 93 no_s, f))
94 94
95 95 def assert_response(self):
96 96 return AssertResponse(self)
97 97
98 98 def get_session_from_response(self):
99 99 """
100 100 This returns the session from a response object.
101 101 """
102
103 from pyramid_beaker import session_factory_from_settings
102 from rhodecode.lib.rc_beaker import session_factory_from_settings
104 103 session = session_factory_from_settings(self.test_app._pyramid_settings)
105 104 return session(self.request)
106 105
107 106
108 107 class TestRequest(webob.BaseRequest):
109 108
110 109 # for py.test
111 110 disabled = True
112 111 ResponseClass = CustomTestResponse
113 112
114 113 def add_response_callback(self, callback):
115 114 pass
116 115
117 116
118 117 class CustomTestApp(TestApp):
119 118 """
120 119 Custom app to make mustcontain more Useful, and extract special methods
121 120 """
122 121 RequestClass = TestRequest
123 122 rc_login_data = {}
124 123 rc_current_session = None
125 124
126 125 def login(self, username=None, password=None):
127 126 from rhodecode.lib import auth
128 127
129 128 if username and password:
130 129 session = login_user_session(self, username, password)
131 130 else:
132 131 session = login_user_session(self)
133 132
134 133 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
135 134 self.rc_current_session = session
136 135 return session['rhodecode_user']
137 136
138 137 @property
139 138 def csrf_token(self):
140 139 return self.rc_login_data['csrf_token']
141 140
142 141 @property
143 142 def _pyramid_registry(self):
144 143 return self.app.config.registry
145 144
146 145 @property
147 146 def _pyramid_settings(self):
148 147 return self._pyramid_registry.settings
149 148
150 149
151 150 def set_anonymous_access(enabled):
152 151 """(Dis)allows anonymous access depending on parameter `enabled`"""
153 152 user = User.get_default_user()
154 153 user.active = enabled
155 154 Session().add(user)
156 155 Session().commit()
157 156 time.sleep(1.5) # must sleep for cache (1s to expire)
158 157 log.info('anonymous access is now: %s', enabled)
159 158 assert enabled == User.get_default_user().active, (
160 159 'Cannot set anonymous access')
161 160
162 161
163 162 def check_xfail_backends(node, backend_alias):
164 163 # Using "xfail_backends" here intentionally, since this marks work
165 164 # which is "to be done" soon.
166 165 skip_marker = node.get_closest_marker('xfail_backends')
167 166 if skip_marker and backend_alias in skip_marker.args:
168 167 msg = "Support for backend %s to be developed." % (backend_alias, )
169 168 msg = skip_marker.kwargs.get('reason', msg)
170 169 pytest.xfail(msg)
171 170
172 171
173 172 def check_skip_backends(node, backend_alias):
174 173 # Using "skip_backends" here intentionally, since this marks work which is
175 174 # not supported.
176 175 skip_marker = node.get_closest_marker('skip_backends')
177 176 if skip_marker and backend_alias in skip_marker.args:
178 177 msg = "Feature not supported for backend %s." % (backend_alias, )
179 178 msg = skip_marker.kwargs.get('reason', msg)
180 179 pytest.skip(msg)
181 180
182 181
183 182 def extract_git_repo_from_dump(dump_name, repo_name):
184 183 """Create git repo `repo_name` from dump `dump_name`."""
185 184 repos_path = ScmModel().repos_path
186 185 target_path = os.path.join(repos_path, repo_name)
187 186 rc_testdata.extract_git_dump(dump_name, target_path)
188 187 return target_path
189 188
190 189
191 190 def extract_hg_repo_from_dump(dump_name, repo_name):
192 191 """Create hg repo `repo_name` from dump `dump_name`."""
193 192 repos_path = ScmModel().repos_path
194 193 target_path = os.path.join(repos_path, repo_name)
195 194 rc_testdata.extract_hg_dump(dump_name, target_path)
196 195 return target_path
197 196
198 197
199 198 def extract_svn_repo_from_dump(dump_name, repo_name):
200 199 """Create a svn repo `repo_name` from dump `dump_name`."""
201 200 repos_path = ScmModel().repos_path
202 201 target_path = os.path.join(repos_path, repo_name)
203 202 SubversionRepository(target_path, create=True)
204 203 _load_svn_dump_into_repo(dump_name, target_path)
205 204 return target_path
206 205
207 206
208 207 def assert_message_in_log(log_records, message, levelno, module):
209 208 messages = [
210 209 r.message for r in log_records
211 210 if r.module == module and r.levelno == levelno
212 211 ]
213 212 assert message in messages
214 213
215 214
216 215 def _load_svn_dump_into_repo(dump_name, repo_path):
217 216 """
218 217 Utility to populate a svn repository with a named dump
219 218
220 219 Currently the dumps are in rc_testdata. They might later on be
221 220 integrated with the main repository once they stabilize more.
222 221 """
223 222 dump = rc_testdata.load_svn_dump(dump_name)
224 223 load_dump = subprocess32.Popen(
225 224 ['svnadmin', 'load', repo_path],
226 225 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
227 226 stderr=subprocess32.PIPE)
228 227 out, err = load_dump.communicate(dump)
229 228 if load_dump.returncode != 0:
230 229 log.error("Output of load_dump command: %s", out)
231 230 log.error("Error output of load_dump command: %s", err)
232 231 raise Exception(
233 232 'Failed to load dump "%s" into repository at path "%s".'
234 233 % (dump_name, repo_path))
235 234
236 235
237 236 class AssertResponse(object):
238 237 """
239 238 Utility that helps to assert things about a given HTML response.
240 239 """
241 240
242 241 def __init__(self, response):
243 242 self.response = response
244 243
245 244 def get_imports(self):
246 245 return fromstring, tostring, CSSSelector
247 246
248 247 def one_element_exists(self, css_selector):
249 248 self.get_element(css_selector)
250 249
251 250 def no_element_exists(self, css_selector):
252 251 assert not self._get_elements(css_selector)
253 252
254 253 def element_equals_to(self, css_selector, expected_content):
255 254 element = self.get_element(css_selector)
256 255 element_text = self._element_to_string(element)
257 256 assert expected_content in element_text
258 257
259 258 def element_contains(self, css_selector, expected_content):
260 259 element = self.get_element(css_selector)
261 260 assert expected_content in element.text_content()
262 261
263 262 def element_value_contains(self, css_selector, expected_content):
264 263 element = self.get_element(css_selector)
265 264 assert expected_content in element.value
266 265
267 266 def contains_one_link(self, link_text, href):
268 267 fromstring, tostring, CSSSelector = self.get_imports()
269 268 doc = fromstring(self.response.body)
270 269 sel = CSSSelector('a[href]')
271 270 elements = [
272 271 e for e in sel(doc) if e.text_content().strip() == link_text]
273 272 assert len(elements) == 1, "Did not find link or found multiple links"
274 273 self._ensure_url_equal(elements[0].attrib.get('href'), href)
275 274
276 275 def contains_one_anchor(self, anchor_id):
277 276 fromstring, tostring, CSSSelector = self.get_imports()
278 277 doc = fromstring(self.response.body)
279 278 sel = CSSSelector('#' + anchor_id)
280 279 elements = sel(doc)
281 280 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
282 281
283 282 def _ensure_url_equal(self, found, expected):
284 283 assert _Url(found) == _Url(expected)
285 284
286 285 def get_element(self, css_selector):
287 286 elements = self._get_elements(css_selector)
288 287 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
289 288 return elements[0]
290 289
291 290 def get_elements(self, css_selector):
292 291 return self._get_elements(css_selector)
293 292
294 293 def _get_elements(self, css_selector):
295 294 fromstring, tostring, CSSSelector = self.get_imports()
296 295 doc = fromstring(self.response.body)
297 296 sel = CSSSelector(css_selector)
298 297 elements = sel(doc)
299 298 return elements
300 299
301 300 def _element_to_string(self, element):
302 301 fromstring, tostring, CSSSelector = self.get_imports()
303 302 return tostring(element)
304 303
305 304
306 305 class _Url(object):
307 306 """
308 307 A url object that can be compared with other url orbjects
309 308 without regard to the vagaries of encoding, escaping, and ordering
310 309 of parameters in query strings.
311 310
312 311 Inspired by
313 312 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
314 313 """
315 314
316 315 def __init__(self, url):
317 316 parts = urlparse(url)
318 317 _query = frozenset(parse_qsl(parts.query))
319 318 _path = unquote_plus(parts.path)
320 319 parts = parts._replace(query=_query, path=_path)
321 320 self.parts = parts
322 321
323 322 def __eq__(self, other):
324 323 return self.parts == other.parts
325 324
326 325 def __hash__(self):
327 326 return hash(self.parts)
328 327
329 328
330 329 def run_test_concurrently(times, raise_catched_exc=True):
331 330 """
332 331 Add this decorator to small pieces of code that you want to test
333 332 concurrently
334 333
335 334 ex:
336 335
337 336 @test_concurrently(25)
338 337 def my_test_function():
339 338 ...
340 339 """
341 340 def test_concurrently_decorator(test_func):
342 341 def wrapper(*args, **kwargs):
343 342 exceptions = []
344 343
345 344 def call_test_func():
346 345 try:
347 346 test_func(*args, **kwargs)
348 347 except Exception as e:
349 348 exceptions.append(e)
350 349 if raise_catched_exc:
351 350 raise
352 351 threads = []
353 352 for i in range(times):
354 353 threads.append(threading.Thread(target=call_test_func))
355 354 for t in threads:
356 355 t.start()
357 356 for t in threads:
358 357 t.join()
359 358 if exceptions:
360 359 raise Exception(
361 360 'test_concurrently intercepted %s exceptions: %s' % (
362 361 len(exceptions), exceptions))
363 362 return wrapper
364 363 return test_concurrently_decorator
365 364
366 365
367 366 def wait_for_url(url, timeout=10):
368 367 """
369 368 Wait until URL becomes reachable.
370 369
371 370 It polls the URL until the timeout is reached or it became reachable.
372 371 If will call to `py.test.fail` in case the URL is not reachable.
373 372 """
374 373 timeout = time.time() + timeout
375 374 last = 0
376 375 wait = 0.1
377 376
378 377 while timeout > last:
379 378 last = time.time()
380 379 if is_url_reachable(url):
381 380 break
382 381 elif (last + wait) > time.time():
383 382 # Go to sleep because not enough time has passed since last check.
384 383 time.sleep(wait)
385 384 else:
386 385 pytest.fail("Timeout while waiting for URL {}".format(url))
387 386
388 387
389 388 def is_url_reachable(url):
390 389 try:
391 390 urllib2.urlopen(url)
392 391 except urllib2.URLError:
393 392 return False
394 393 return True
395 394
396 395
397 396 def repo_on_filesystem(repo_name):
398 397 from rhodecode.lib import vcs
399 398 from rhodecode.tests import TESTS_TMP_PATH
400 399 repo = vcs.get_vcs_instance(
401 400 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
402 401 return repo is not None
403 402
404 403
405 404 def commit_change(
406 405 repo, filename, content, message, vcs_type, parent=None, newfile=False):
407 406 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
408 407
409 408 repo = Repository.get_by_repo_name(repo)
410 409 _commit = parent
411 410 if not parent:
412 411 _commit = EmptyCommit(alias=vcs_type)
413 412
414 413 if newfile:
415 414 nodes = {
416 415 filename: {
417 416 'content': content
418 417 }
419 418 }
420 419 commit = ScmModel().create_nodes(
421 420 user=TEST_USER_ADMIN_LOGIN, repo=repo,
422 421 message=message,
423 422 nodes=nodes,
424 423 parent_commit=_commit,
425 424 author=TEST_USER_ADMIN_LOGIN,
426 425 )
427 426 else:
428 427 commit = ScmModel().commit_change(
429 428 repo=repo.scm_instance(), repo_name=repo.repo_name,
430 429 commit=parent, user=TEST_USER_ADMIN_LOGIN,
431 430 author=TEST_USER_ADMIN_LOGIN,
432 431 message=message,
433 432 content=content,
434 433 f_path=filename
435 434 )
436 435 return commit
437 436
438 437
439 438 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
440 439 if not default:
441 440 raise ValueError('Permission for default user must be given')
442 441 form_data = [(
443 442 'csrf_token', csrf_token
444 443 )]
445 444 # add default
446 445 form_data.extend([
447 446 ('u_perm_1', default)
448 447 ])
449 448
450 449 if grant:
451 450 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
452 451 form_data.extend([
453 452 ('perm_new_member_perm_new{}'.format(cnt), perm),
454 453 ('perm_new_member_id_new{}'.format(cnt), obj_id),
455 454 ('perm_new_member_name_new{}'.format(cnt), obj_name),
456 455 ('perm_new_member_type_new{}'.format(cnt), obj_type),
457 456
458 457 ])
459 458 if revoke:
460 459 for obj_id, obj_type in revoke:
461 460 form_data.extend([
462 461 ('perm_del_member_id_{}'.format(obj_id), obj_id),
463 462 ('perm_del_member_type_{}'.format(obj_id), obj_type),
464 463 ])
465 464 return form_data
General Comments 0
You need to be logged in to leave comments. Login now