##// END OF EJS Templates
tests: allow logging exceptions on url reach check
super-admin -
r5023:5d32bc46 default
parent child Browse files
Show More
@@ -1,486 +1,487 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import threading
21 import threading
22 import time
22 import time
23 import logging
23 import logging
24 import os.path
24 import os.path
25 import subprocess
25 import subprocess
26 import tempfile
26 import tempfile
27 import urllib.request
27 import urllib.request
28 import urllib.error
28 import urllib.error
29 import urllib.parse
29 import urllib.parse
30 from lxml.html import fromstring, tostring
30 from lxml.html import fromstring, tostring
31 from lxml.cssselect import CSSSelector
31 from lxml.cssselect import CSSSelector
32 from urllib.parse import unquote_plus
32 from urllib.parse import unquote_plus
33 import webob
33 import webob
34
34
35 from webtest.app import TestResponse, TestApp
35 from webtest.app import TestResponse, TestApp
36 from webtest.compat import print_stderr
36 from webtest.compat import print_stderr
37
37
38 import pytest
38 import pytest
39
39
40 try:
40 try:
41 import rc_testdata
41 import rc_testdata
42 except ImportError:
42 except ImportError:
43 raise ImportError('Failed to import rc_testdata, '
43 raise ImportError('Failed to import rc_testdata, '
44 'please make sure this package is installed from requirements_test.txt')
44 'please make sure this package is installed from requirements_test.txt')
45
45
46 from rhodecode.model.db import User, Repository
46 from rhodecode.model.db import User, Repository
47 from rhodecode.model.meta import Session
47 from rhodecode.model.meta import Session
48 from rhodecode.model.scm import ScmModel
48 from rhodecode.model.scm import ScmModel
49 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
49 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
50 from rhodecode.lib.vcs.backends.base import EmptyCommit
50 from rhodecode.lib.vcs.backends.base import EmptyCommit
51 from rhodecode.tests import login_user_session
51 from rhodecode.tests import login_user_session
52
52
53 log = logging.getLogger(__name__)
53 log = logging.getLogger(__name__)
54
54
55
55
56 class CustomTestResponse(TestResponse):
56 class CustomTestResponse(TestResponse):
57
57
58 def _save_output(self, out):
58 def _save_output(self, out):
59 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
59 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
60 f.write(out)
60 f.write(out)
61 return f.name
61 return f.name
62
62
63 def mustcontain(self, *strings, **kw):
63 def mustcontain(self, *strings, **kw):
64 """
64 """
65 Assert that the response contains all of the strings passed
65 Assert that the response contains all of the strings passed
66 in as arguments.
66 in as arguments.
67
67
68 Equivalent to::
68 Equivalent to::
69
69
70 assert string in res
70 assert string in res
71 """
71 """
72 print_body = kw.pop('print_body', False)
72 print_body = kw.pop('print_body', False)
73 if 'no' in kw:
73 if 'no' in kw:
74 no = kw['no']
74 no = kw['no']
75 del kw['no']
75 del kw['no']
76 if isinstance(no, str):
76 if isinstance(no, str):
77 no = [no]
77 no = [no]
78 else:
78 else:
79 no = []
79 no = []
80 if kw:
80 if kw:
81 raise TypeError(
81 raise TypeError(
82 "The only keyword argument allowed is 'no' got %s" % kw)
82 "The only keyword argument allowed is 'no' got %s" % kw)
83
83
84 f = self._save_output(str(self))
84 f = self._save_output(str(self))
85
85
86 for s in strings:
86 for s in strings:
87 if s not in self:
87 if s not in self:
88 print_stderr("Actual response (no %r):" % s)
88 print_stderr("Actual response (no %r):" % s)
89 print_stderr("body output saved as `%s`" % f)
89 print_stderr("body output saved as `%s`" % f)
90 if print_body:
90 if print_body:
91 print_stderr(str(self))
91 print_stderr(str(self))
92 raise IndexError(
92 raise IndexError(
93 "Body does not contain string %r, body output saved as %s" % (s, f))
93 "Body does not contain string %r, body output saved as %s" % (s, f))
94
94
95 for no_s in no:
95 for no_s in no:
96 if no_s in self:
96 if no_s in self:
97 print_stderr("Actual response (has %r)" % no_s)
97 print_stderr("Actual response (has %r)" % no_s)
98 print_stderr("body output saved as `%s`" % f)
98 print_stderr("body output saved as `%s`" % f)
99 if print_body:
99 if print_body:
100 print_stderr(str(self))
100 print_stderr(str(self))
101 raise IndexError(
101 raise IndexError(
102 "Body contains bad string %r, body output saved as %s" % (no_s, f))
102 "Body contains bad string %r, body output saved as %s" % (no_s, f))
103
103
104 def assert_response(self):
104 def assert_response(self):
105 return AssertResponse(self)
105 return AssertResponse(self)
106
106
107 def get_session_from_response(self):
107 def get_session_from_response(self):
108 """
108 """
109 This returns the session from a response object.
109 This returns the session from a response object.
110 """
110 """
111 from rhodecode.lib.rc_beaker import session_factory_from_settings
111 from rhodecode.lib.rc_beaker import session_factory_from_settings
112 session = session_factory_from_settings(self.test_app._pyramid_settings)
112 session = session_factory_from_settings(self.test_app._pyramid_settings)
113 return session(self.request)
113 return session(self.request)
114
114
115
115
116 class TestRequest(webob.BaseRequest):
116 class TestRequest(webob.BaseRequest):
117
117
118 # for py.test, so it doesn't try to run this tas by name starting with test...
118 # for py.test, so it doesn't try to run this tas by name starting with test...
119 disabled = True
119 disabled = True
120 ResponseClass = CustomTestResponse
120 ResponseClass = CustomTestResponse
121
121
122 def add_response_callback(self, callback):
122 def add_response_callback(self, callback):
123 pass
123 pass
124
124
125 @classmethod
125 @classmethod
126 def blank(cls, path, environ=None, base_url=None,
126 def blank(cls, path, environ=None, base_url=None,
127 headers=None, POST=None, **kw):
127 headers=None, POST=None, **kw):
128
128
129 if not path.isascii():
129 if not path.isascii():
130 # our custom quote path if it contains non-ascii chars
130 # our custom quote path if it contains non-ascii chars
131 path = urllib.parse.quote(path)
131 path = urllib.parse.quote(path)
132
132
133 return super(TestRequest, cls).blank(
133 return super(TestRequest, cls).blank(
134 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
134 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
135
135
136
136
137 class CustomTestApp(TestApp):
137 class CustomTestApp(TestApp):
138 """
138 """
139 Custom app to make mustcontain more Useful, and extract special methods
139 Custom app to make mustcontain more Useful, and extract special methods
140 """
140 """
141 RequestClass = TestRequest
141 RequestClass = TestRequest
142 rc_login_data = {}
142 rc_login_data = {}
143 rc_current_session = None
143 rc_current_session = None
144
144
145 def login(self, username=None, password=None):
145 def login(self, username=None, password=None):
146 from rhodecode.lib import auth
146 from rhodecode.lib import auth
147
147
148 if username and password:
148 if username and password:
149 session = login_user_session(self, username, password)
149 session = login_user_session(self, username, password)
150 else:
150 else:
151 session = login_user_session(self)
151 session = login_user_session(self)
152
152
153 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
153 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
154 self.rc_current_session = session
154 self.rc_current_session = session
155 return session['rhodecode_user']
155 return session['rhodecode_user']
156
156
157 @property
157 @property
158 def csrf_token(self):
158 def csrf_token(self):
159 return self.rc_login_data['csrf_token']
159 return self.rc_login_data['csrf_token']
160
160
161 @property
161 @property
162 def _pyramid_registry(self):
162 def _pyramid_registry(self):
163 return self.app.config.registry
163 return self.app.config.registry
164
164
165 @property
165 @property
166 def _pyramid_settings(self):
166 def _pyramid_settings(self):
167 return self._pyramid_registry.settings
167 return self._pyramid_registry.settings
168
168
169
169
170 def set_anonymous_access(enabled):
170 def set_anonymous_access(enabled):
171 """(Dis)allows anonymous access depending on parameter `enabled`"""
171 """(Dis)allows anonymous access depending on parameter `enabled`"""
172 user = User.get_default_user()
172 user = User.get_default_user()
173 user.active = enabled
173 user.active = enabled
174 Session().add(user)
174 Session().add(user)
175 Session().commit()
175 Session().commit()
176 time.sleep(1.5) # must sleep for cache (1s to expire)
176 time.sleep(1.5) # must sleep for cache (1s to expire)
177 log.info('anonymous access is now: %s', enabled)
177 log.info('anonymous access is now: %s', enabled)
178 assert enabled == User.get_default_user().active, (
178 assert enabled == User.get_default_user().active, (
179 'Cannot set anonymous access')
179 'Cannot set anonymous access')
180
180
181
181
182 def check_xfail_backends(node, backend_alias):
182 def check_xfail_backends(node, backend_alias):
183 # Using "xfail_backends" here intentionally, since this marks work
183 # Using "xfail_backends" here intentionally, since this marks work
184 # which is "to be done" soon.
184 # which is "to be done" soon.
185 skip_marker = node.get_closest_marker('xfail_backends')
185 skip_marker = node.get_closest_marker('xfail_backends')
186 if skip_marker and backend_alias in skip_marker.args:
186 if skip_marker and backend_alias in skip_marker.args:
187 msg = "Support for backend %s to be developed." % (backend_alias, )
187 msg = "Support for backend %s to be developed." % (backend_alias, )
188 msg = skip_marker.kwargs.get('reason', msg)
188 msg = skip_marker.kwargs.get('reason', msg)
189 pytest.xfail(msg)
189 pytest.xfail(msg)
190
190
191
191
192 def check_skip_backends(node, backend_alias):
192 def check_skip_backends(node, backend_alias):
193 # Using "skip_backends" here intentionally, since this marks work which is
193 # Using "skip_backends" here intentionally, since this marks work which is
194 # not supported.
194 # not supported.
195 skip_marker = node.get_closest_marker('skip_backends')
195 skip_marker = node.get_closest_marker('skip_backends')
196 if skip_marker and backend_alias in skip_marker.args:
196 if skip_marker and backend_alias in skip_marker.args:
197 msg = "Feature not supported for backend %s." % (backend_alias, )
197 msg = "Feature not supported for backend %s." % (backend_alias, )
198 msg = skip_marker.kwargs.get('reason', msg)
198 msg = skip_marker.kwargs.get('reason', msg)
199 pytest.skip(msg)
199 pytest.skip(msg)
200
200
201
201
202 def extract_git_repo_from_dump(dump_name, repo_name):
202 def extract_git_repo_from_dump(dump_name, repo_name):
203 """Create git repo `repo_name` from dump `dump_name`."""
203 """Create git repo `repo_name` from dump `dump_name`."""
204 repos_path = ScmModel().repos_path
204 repos_path = ScmModel().repos_path
205 target_path = os.path.join(repos_path, repo_name)
205 target_path = os.path.join(repos_path, repo_name)
206 rc_testdata.extract_git_dump(dump_name, target_path)
206 rc_testdata.extract_git_dump(dump_name, target_path)
207 return target_path
207 return target_path
208
208
209
209
210 def extract_hg_repo_from_dump(dump_name, repo_name):
210 def extract_hg_repo_from_dump(dump_name, repo_name):
211 """Create hg repo `repo_name` from dump `dump_name`."""
211 """Create hg repo `repo_name` from dump `dump_name`."""
212 repos_path = ScmModel().repos_path
212 repos_path = ScmModel().repos_path
213 target_path = os.path.join(repos_path, repo_name)
213 target_path = os.path.join(repos_path, repo_name)
214 rc_testdata.extract_hg_dump(dump_name, target_path)
214 rc_testdata.extract_hg_dump(dump_name, target_path)
215 return target_path
215 return target_path
216
216
217
217
218 def extract_svn_repo_from_dump(dump_name, repo_name):
218 def extract_svn_repo_from_dump(dump_name, repo_name):
219 """Create a svn repo `repo_name` from dump `dump_name`."""
219 """Create a svn repo `repo_name` from dump `dump_name`."""
220 repos_path = ScmModel().repos_path
220 repos_path = ScmModel().repos_path
221 target_path = os.path.join(repos_path, repo_name)
221 target_path = os.path.join(repos_path, repo_name)
222 SubversionRepository(target_path, create=True)
222 SubversionRepository(target_path, create=True)
223 _load_svn_dump_into_repo(dump_name, target_path)
223 _load_svn_dump_into_repo(dump_name, target_path)
224 return target_path
224 return target_path
225
225
226
226
227 def assert_message_in_log(log_records, message, levelno, module):
227 def assert_message_in_log(log_records, message, levelno, module):
228 messages = [
228 messages = [
229 r.message for r in log_records
229 r.message for r in log_records
230 if r.module == module and r.levelno == levelno
230 if r.module == module and r.levelno == levelno
231 ]
231 ]
232 assert message in messages
232 assert message in messages
233
233
234
234
235 def _load_svn_dump_into_repo(dump_name, repo_path):
235 def _load_svn_dump_into_repo(dump_name, repo_path):
236 """
236 """
237 Utility to populate a svn repository with a named dump
237 Utility to populate a svn repository with a named dump
238
238
239 Currently the dumps are in rc_testdata. They might later on be
239 Currently the dumps are in rc_testdata. They might later on be
240 integrated with the main repository once they stabilize more.
240 integrated with the main repository once they stabilize more.
241 """
241 """
242 dump = rc_testdata.load_svn_dump(dump_name)
242 dump = rc_testdata.load_svn_dump(dump_name)
243 load_dump = subprocess.Popen(
243 load_dump = subprocess.Popen(
244 ['svnadmin', 'load', repo_path],
244 ['svnadmin', 'load', repo_path],
245 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
245 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
246 stderr=subprocess.PIPE)
246 stderr=subprocess.PIPE)
247 out, err = load_dump.communicate(dump)
247 out, err = load_dump.communicate(dump)
248 if load_dump.returncode != 0:
248 if load_dump.returncode != 0:
249 log.error("Output of load_dump command: %s", out)
249 log.error("Output of load_dump command: %s", out)
250 log.error("Error output of load_dump command: %s", err)
250 log.error("Error output of load_dump command: %s", err)
251 raise Exception(
251 raise Exception(
252 'Failed to load dump "%s" into repository at path "%s".'
252 'Failed to load dump "%s" into repository at path "%s".'
253 % (dump_name, repo_path))
253 % (dump_name, repo_path))
254
254
255
255
256 class AssertResponse(object):
256 class AssertResponse(object):
257 """
257 """
258 Utility that helps to assert things about a given HTML response.
258 Utility that helps to assert things about a given HTML response.
259 """
259 """
260
260
261 def __init__(self, response):
261 def __init__(self, response):
262 self.response = response
262 self.response = response
263
263
264 def get_imports(self):
264 def get_imports(self):
265 return fromstring, tostring, CSSSelector
265 return fromstring, tostring, CSSSelector
266
266
267 def one_element_exists(self, css_selector):
267 def one_element_exists(self, css_selector):
268 self.get_element(css_selector)
268 self.get_element(css_selector)
269
269
270 def no_element_exists(self, css_selector):
270 def no_element_exists(self, css_selector):
271 assert not self._get_elements(css_selector)
271 assert not self._get_elements(css_selector)
272
272
273 def element_equals_to(self, css_selector, expected_content):
273 def element_equals_to(self, css_selector, expected_content):
274 element = self.get_element(css_selector)
274 element = self.get_element(css_selector)
275 element_text = self._element_to_string(element)
275 element_text = self._element_to_string(element)
276
276
277 assert expected_content in element_text
277 assert expected_content in element_text
278
278
279 def element_contains(self, css_selector, expected_content):
279 def element_contains(self, css_selector, expected_content):
280 element = self.get_element(css_selector)
280 element = self.get_element(css_selector)
281 assert expected_content in element.text_content()
281 assert expected_content in element.text_content()
282
282
283 def element_value_contains(self, css_selector, expected_content):
283 def element_value_contains(self, css_selector, expected_content):
284 element = self.get_element(css_selector)
284 element = self.get_element(css_selector)
285 assert expected_content in element.value
285 assert expected_content in element.value
286
286
287 def contains_one_link(self, link_text, href):
287 def contains_one_link(self, link_text, href):
288 fromstring, tostring, CSSSelector = self.get_imports()
288 fromstring, tostring, CSSSelector = self.get_imports()
289 doc = fromstring(self.response.body)
289 doc = fromstring(self.response.body)
290 sel = CSSSelector('a[href]')
290 sel = CSSSelector('a[href]')
291 elements = [
291 elements = [
292 e for e in sel(doc) if e.text_content().strip() == link_text]
292 e for e in sel(doc) if e.text_content().strip() == link_text]
293 assert len(elements) == 1, "Did not find link or found multiple links"
293 assert len(elements) == 1, "Did not find link or found multiple links"
294 self._ensure_url_equal(elements[0].attrib.get('href'), href)
294 self._ensure_url_equal(elements[0].attrib.get('href'), href)
295
295
296 def contains_one_anchor(self, anchor_id):
296 def contains_one_anchor(self, anchor_id):
297 fromstring, tostring, CSSSelector = self.get_imports()
297 fromstring, tostring, CSSSelector = self.get_imports()
298 doc = fromstring(self.response.body)
298 doc = fromstring(self.response.body)
299 sel = CSSSelector('#' + anchor_id)
299 sel = CSSSelector('#' + anchor_id)
300 elements = sel(doc)
300 elements = sel(doc)
301 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
301 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
302
302
303 def _ensure_url_equal(self, found, expected):
303 def _ensure_url_equal(self, found, expected):
304 assert _Url(found) == _Url(expected)
304 assert _Url(found) == _Url(expected)
305
305
306 def get_element(self, css_selector):
306 def get_element(self, css_selector):
307 elements = self._get_elements(css_selector)
307 elements = self._get_elements(css_selector)
308 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
308 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
309 return elements[0]
309 return elements[0]
310
310
311 def get_elements(self, css_selector):
311 def get_elements(self, css_selector):
312 return self._get_elements(css_selector)
312 return self._get_elements(css_selector)
313
313
314 def _get_elements(self, css_selector):
314 def _get_elements(self, css_selector):
315 fromstring, tostring, CSSSelector = self.get_imports()
315 fromstring, tostring, CSSSelector = self.get_imports()
316 doc = fromstring(self.response.body)
316 doc = fromstring(self.response.body)
317 sel = CSSSelector(css_selector)
317 sel = CSSSelector(css_selector)
318 elements = sel(doc)
318 elements = sel(doc)
319 return elements
319 return elements
320
320
321 def _element_to_string(self, element):
321 def _element_to_string(self, element):
322 fromstring, tostring, CSSSelector = self.get_imports()
322 fromstring, tostring, CSSSelector = self.get_imports()
323 return tostring(element, encoding='unicode')
323 return tostring(element, encoding='unicode')
324
324
325
325
326 class _Url(object):
326 class _Url(object):
327 """
327 """
328 A url object that can be compared with other url orbjects
328 A url object that can be compared with other url orbjects
329 without regard to the vagaries of encoding, escaping, and ordering
329 without regard to the vagaries of encoding, escaping, and ordering
330 of parameters in query strings.
330 of parameters in query strings.
331
331
332 Inspired by
332 Inspired by
333 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
333 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
334 """
334 """
335
335
336 def __init__(self, url):
336 def __init__(self, url):
337 parts = urllib.parse.urlparse(url)
337 parts = urllib.parse.urlparse(url)
338 _query = frozenset(urllib.parse.parse_qsl(parts.query))
338 _query = frozenset(urllib.parse.parse_qsl(parts.query))
339 _path = unquote_plus(parts.path)
339 _path = unquote_plus(parts.path)
340 parts = parts._replace(query=_query, path=_path)
340 parts = parts._replace(query=_query, path=_path)
341 self.parts = parts
341 self.parts = parts
342
342
343 def __eq__(self, other):
343 def __eq__(self, other):
344 return self.parts == other.parts
344 return self.parts == other.parts
345
345
346 def __hash__(self):
346 def __hash__(self):
347 return hash(self.parts)
347 return hash(self.parts)
348
348
349
349
350 def run_test_concurrently(times, raise_catched_exc=True):
350 def run_test_concurrently(times, raise_catched_exc=True):
351 """
351 """
352 Add this decorator to small pieces of code that you want to test
352 Add this decorator to small pieces of code that you want to test
353 concurrently
353 concurrently
354
354
355 ex:
355 ex:
356
356
357 @test_concurrently(25)
357 @test_concurrently(25)
358 def my_test_function():
358 def my_test_function():
359 ...
359 ...
360 """
360 """
361 def test_concurrently_decorator(test_func):
361 def test_concurrently_decorator(test_func):
362 def wrapper(*args, **kwargs):
362 def wrapper(*args, **kwargs):
363 exceptions = []
363 exceptions = []
364
364
365 def call_test_func():
365 def call_test_func():
366 try:
366 try:
367 test_func(*args, **kwargs)
367 test_func(*args, **kwargs)
368 except Exception as e:
368 except Exception as e:
369 exceptions.append(e)
369 exceptions.append(e)
370 if raise_catched_exc:
370 if raise_catched_exc:
371 raise
371 raise
372 threads = []
372 threads = []
373 for i in range(times):
373 for i in range(times):
374 threads.append(threading.Thread(target=call_test_func))
374 threads.append(threading.Thread(target=call_test_func))
375 for t in threads:
375 for t in threads:
376 t.start()
376 t.start()
377 for t in threads:
377 for t in threads:
378 t.join()
378 t.join()
379 if exceptions:
379 if exceptions:
380 raise Exception(
380 raise Exception(
381 'test_concurrently intercepted %s exceptions: %s' % (
381 'test_concurrently intercepted %s exceptions: %s' % (
382 len(exceptions), exceptions))
382 len(exceptions), exceptions))
383 return wrapper
383 return wrapper
384 return test_concurrently_decorator
384 return test_concurrently_decorator
385
385
386
386
387 def wait_for_url(url, timeout=10):
387 def wait_for_url(url, timeout=10):
388 """
388 """
389 Wait until URL becomes reachable.
389 Wait until URL becomes reachable.
390
390
391 It polls the URL until the timeout is reached or it became reachable.
391 It polls the URL until the timeout is reached or it became reachable.
392 If will call to `py.test.fail` in case the URL is not reachable.
392 If will call to `py.test.fail` in case the URL is not reachable.
393 """
393 """
394 timeout = time.time() + timeout
394 timeout = time.time() + timeout
395 last = 0
395 last = 0
396 wait = 0.1
396 wait = 0.1
397
397
398 while timeout > last:
398 while timeout > last:
399 last = time.time()
399 last = time.time()
400 if is_url_reachable(url):
400 if is_url_reachable(url, log_exc=False):
401 break
401 break
402 elif (last + wait) > time.time():
402 elif (last + wait) > time.time():
403 # Go to sleep because not enough time has passed since last check.
403 # Go to sleep because not enough time has passed since last check.
404 time.sleep(wait)
404 time.sleep(wait)
405 else:
405 else:
406 pytest.fail("Timeout while waiting for URL {}".format(url))
406 pytest.fail(f"Timeout while waiting for URL {url}")
407
407
408
408
409 def is_url_reachable(url):
409 def is_url_reachable(url: str, log_exc: bool = True) -> bool:
410 try:
410 try:
411 urllib.request.urlopen(url)
411 urllib.request.urlopen(url)
412 except urllib.error.URLError:
412 except urllib.error.URLError:
413 log.exception('URL `{}` reach error'.format(url))
413 if log_exc:
414 log.exception('URL `{}` reach error'.format(url))
414 return False
415 return False
415 return True
416 return True
416
417
417
418
418 def repo_on_filesystem(repo_name):
419 def repo_on_filesystem(repo_name):
419 from rhodecode.lib import vcs
420 from rhodecode.lib import vcs
420 from rhodecode.tests import TESTS_TMP_PATH
421 from rhodecode.tests import TESTS_TMP_PATH
421 repo = vcs.get_vcs_instance(
422 repo = vcs.get_vcs_instance(
422 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
423 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
423 return repo is not None
424 return repo is not None
424
425
425
426
426 def commit_change(
427 def commit_change(
427 repo, filename, content, message, vcs_type, parent=None, newfile=False):
428 repo, filename, content, message, vcs_type, parent=None, newfile=False):
428 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
429 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
429
430
430 repo = Repository.get_by_repo_name(repo)
431 repo = Repository.get_by_repo_name(repo)
431 _commit = parent
432 _commit = parent
432 if not parent:
433 if not parent:
433 _commit = EmptyCommit(alias=vcs_type)
434 _commit = EmptyCommit(alias=vcs_type)
434
435
435 if newfile:
436 if newfile:
436 nodes = {
437 nodes = {
437 filename: {
438 filename: {
438 'content': content
439 'content': content
439 }
440 }
440 }
441 }
441 commit = ScmModel().create_nodes(
442 commit = ScmModel().create_nodes(
442 user=TEST_USER_ADMIN_LOGIN, repo=repo,
443 user=TEST_USER_ADMIN_LOGIN, repo=repo,
443 message=message,
444 message=message,
444 nodes=nodes,
445 nodes=nodes,
445 parent_commit=_commit,
446 parent_commit=_commit,
446 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
447 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
447 )
448 )
448 else:
449 else:
449 commit = ScmModel().commit_change(
450 commit = ScmModel().commit_change(
450 repo=repo.scm_instance(), repo_name=repo.repo_name,
451 repo=repo.scm_instance(), repo_name=repo.repo_name,
451 commit=parent, user=TEST_USER_ADMIN_LOGIN,
452 commit=parent, user=TEST_USER_ADMIN_LOGIN,
452 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
453 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
453 message=message,
454 message=message,
454 content=content,
455 content=content,
455 f_path=filename
456 f_path=filename
456 )
457 )
457 return commit
458 return commit
458
459
459
460
460 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
461 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
461 if not default:
462 if not default:
462 raise ValueError('Permission for default user must be given')
463 raise ValueError('Permission for default user must be given')
463 form_data = [(
464 form_data = [(
464 'csrf_token', csrf_token
465 'csrf_token', csrf_token
465 )]
466 )]
466 # add default
467 # add default
467 form_data.extend([
468 form_data.extend([
468 ('u_perm_1', default)
469 ('u_perm_1', default)
469 ])
470 ])
470
471
471 if grant:
472 if grant:
472 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
473 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
473 form_data.extend([
474 form_data.extend([
474 ('perm_new_member_perm_new{}'.format(cnt), perm),
475 ('perm_new_member_perm_new{}'.format(cnt), perm),
475 ('perm_new_member_id_new{}'.format(cnt), obj_id),
476 ('perm_new_member_id_new{}'.format(cnt), obj_id),
476 ('perm_new_member_name_new{}'.format(cnt), obj_name),
477 ('perm_new_member_name_new{}'.format(cnt), obj_name),
477 ('perm_new_member_type_new{}'.format(cnt), obj_type),
478 ('perm_new_member_type_new{}'.format(cnt), obj_type),
478
479
479 ])
480 ])
480 if revoke:
481 if revoke:
481 for obj_id, obj_type in revoke:
482 for obj_id, obj_type in revoke:
482 form_data.extend([
483 form_data.extend([
483 ('perm_del_member_id_{}'.format(obj_id), obj_id),
484 ('perm_del_member_id_{}'.format(obj_id), obj_id),
484 ('perm_del_member_type_{}'.format(obj_id), obj_type),
485 ('perm_del_member_type_{}'.format(obj_id), obj_type),
485 ])
486 ])
486 return form_data
487 return form_data
General Comments 0
You need to be logged in to leave comments. Login now