##// END OF EJS Templates
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
super-admin -
r5190:9f610147 default
parent child Browse files
Show More
@@ -1,486 +1,493 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import threading
19 import threading
20 import time
20 import time
21 import sys
21 import logging
22 import logging
22 import os.path
23 import os.path
23 import subprocess
24 import subprocess
24 import tempfile
25 import tempfile
25 import urllib.request
26 import urllib.request
26 import urllib.error
27 import urllib.error
27 import urllib.parse
28 import urllib.parse
28 from lxml.html import fromstring, tostring
29 from lxml.html import fromstring, tostring
29 from lxml.cssselect import CSSSelector
30 from lxml.cssselect import CSSSelector
30 from urllib.parse import unquote_plus
31 from urllib.parse import unquote_plus
31 import webob
32 import webob
32
33
33 from webtest.app import TestResponse, TestApp
34 from webtest.app import TestResponse, TestApp
34 from webtest.compat import print_stderr
35
35
36
36 import pytest
37 import pytest
37
38
38 try:
39 try:
39 import rc_testdata
40 import rc_testdata
40 except ImportError:
41 except ImportError:
41 raise ImportError('Failed to import rc_testdata, '
42 raise ImportError('Failed to import rc_testdata, '
42 'please make sure this package is installed from requirements_test.txt')
43 'please make sure this package is installed from requirements_test.txt')
43
44
44 from rhodecode.model.db import User, Repository
45 from rhodecode.model.db import User, Repository
45 from rhodecode.model.meta import Session
46 from rhodecode.model.meta import Session
46 from rhodecode.model.scm import ScmModel
47 from rhodecode.model.scm import ScmModel
47 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
48 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
48 from rhodecode.lib.vcs.backends.base import EmptyCommit
49 from rhodecode.lib.vcs.backends.base import EmptyCommit
49 from rhodecode.tests import login_user_session
50 from rhodecode.tests import login_user_session
50
51
51 log = logging.getLogger(__name__)
52 log = logging.getLogger(__name__)
52
53
53
54
55 def print_to_func(value, print_to=sys.stderr):
56 print(value, file=print_to)
57
58
54 class CustomTestResponse(TestResponse):
59 class CustomTestResponse(TestResponse):
55
60
56 def _save_output(self, out):
61 def _save_output(self, out):
57 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
62 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
58 f.write(out)
63 f.write(out)
59 return f.name
64 return f.name
60
65
61 def mustcontain(self, *strings, **kw):
66 def mustcontain(self, *strings, **kw):
62 """
67 """
63 Assert that the response contains all of the strings passed
68 Assert that the response contains all the strings passed
64 in as arguments.
69 in as arguments.
65
70
66 Equivalent to::
71 Equivalent to::
67
72
68 assert string in res
73 assert string in res
69 """
74 """
70 print_body = kw.pop('print_body', False)
75 print_body = kw.pop('print_body', False)
76 print_to = kw.pop('print_to', sys.stderr)
77
71 if 'no' in kw:
78 if 'no' in kw:
72 no = kw['no']
79 no = kw['no']
73 del kw['no']
80 del kw['no']
74 if isinstance(no, str):
81 if isinstance(no, str):
75 no = [no]
82 no = [no]
76 else:
83 else:
77 no = []
84 no = []
78 if kw:
85 if kw:
79 raise TypeError(f"The only keyword argument allowed is 'no' got {kw}")
86 raise TypeError(f"The only keyword argument allowed is 'no' got {kw}")
80
87
81 f = self._save_output(str(self))
88 f = self._save_output(str(self))
82
89
83 for s in strings:
90 for s in strings:
84 if s not in self:
91 if s not in self:
85 print_stderr(f"Actual response (no {s!r}):")
92 print_to_func(f"Actual response (no {s!r}):", print_to=print_to)
86 print_stderr(f"body output saved as `{f}`")
93 print_to_func(f"body output saved as `{f}`", print_to=print_to)
87 if print_body:
94 if print_body:
88 print_stderr(str(self))
95 print_to_func(str(self), print_to=print_to)
89 raise IndexError(f"Body does not contain string {s!r}, body output saved as {f}")
96 raise IndexError(f"Body does not contain string {s!r}, body output saved as {f}")
90
97
91 for no_s in no:
98 for no_s in no:
92 if no_s in self:
99 if no_s in self:
93 print_stderr(f"Actual response (has {no_s!r})")
100 print_to_func(f"Actual response (has {no_s!r})", print_to=print_to)
94 print_stderr(f"body output saved as `{f}`")
101 print_to_func(f"body output saved as `{f}`", print_to=print_to)
95 if print_body:
102 if print_body:
96 print_stderr(str(self))
103 print_to_func(str(self), print_to=print_to)
97 raise IndexError(f"Body contains bad string {no_s!r}, body output saved as {f}")
104 raise IndexError(f"Body contains bad string {no_s!r}, body output saved as {f}")
98
105
99 def assert_response(self):
106 def assert_response(self):
100 return AssertResponse(self)
107 return AssertResponse(self)
101
108
102 def get_session_from_response(self):
109 def get_session_from_response(self):
103 """
110 """
104 This returns the session from a response object.
111 This returns the session from a response object.
105 """
112 """
106 from rhodecode.lib.rc_beaker import session_factory_from_settings
113 from rhodecode.lib.rc_beaker import session_factory_from_settings
107 session = session_factory_from_settings(self.test_app._pyramid_settings)
114 session = session_factory_from_settings(self.test_app._pyramid_settings)
108 return session(self.request)
115 return session(self.request)
109
116
110
117
111 class TestRequest(webob.BaseRequest):
118 class TestRequest(webob.BaseRequest):
112
119
113 # for py.test, so it doesn't try to run this tas by name starting with test...
120 # for py.test, so it doesn't try to run this tas by name starting with test...
114 disabled = True
121 disabled = True
115 ResponseClass = CustomTestResponse
122 ResponseClass = CustomTestResponse
116
123
117 def add_response_callback(self, callback):
124 def add_response_callback(self, callback):
118 pass
125 pass
119
126
120 @classmethod
127 @classmethod
121 def blank(cls, path, environ=None, base_url=None,
128 def blank(cls, path, environ=None, base_url=None,
122 headers=None, POST=None, **kw):
129 headers=None, POST=None, **kw):
123
130
124 if not path.isascii():
131 if not path.isascii():
125 # our custom quote path if it contains non-ascii chars
132 # our custom quote path if it contains non-ascii chars
126 path = urllib.parse.quote(path)
133 path = urllib.parse.quote(path)
127
134
128 return super(TestRequest, cls).blank(
135 return super(TestRequest, cls).blank(
129 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
136 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
130
137
131
138
132 class CustomTestApp(TestApp):
139 class CustomTestApp(TestApp):
133 """
140 """
134 Custom app to make mustcontain more Useful, and extract special methods
141 Custom app to make mustcontain more Useful, and extract special methods
135 """
142 """
136 RequestClass = TestRequest
143 RequestClass = TestRequest
137 rc_login_data = {}
144 rc_login_data = {}
138 rc_current_session = None
145 rc_current_session = None
139
146
140 def login(self, username=None, password=None):
147 def login(self, username=None, password=None):
141 from rhodecode.lib import auth
148 from rhodecode.lib import auth
142
149
143 if username and password:
150 if username and password:
144 session = login_user_session(self, username, password)
151 session = login_user_session(self, username, password)
145 else:
152 else:
146 session = login_user_session(self)
153 session = login_user_session(self)
147
154
148 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
155 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
149 self.rc_current_session = session
156 self.rc_current_session = session
150 return session['rhodecode_user']
157 return session['rhodecode_user']
151
158
152 @property
159 @property
153 def csrf_token(self):
160 def csrf_token(self):
154 return self.rc_login_data['csrf_token']
161 return self.rc_login_data['csrf_token']
155
162
156 @property
163 @property
157 def _pyramid_registry(self):
164 def _pyramid_registry(self):
158 return self.app.config.registry
165 return self.app.config.registry
159
166
160 @property
167 @property
161 def _pyramid_settings(self):
168 def _pyramid_settings(self):
162 return self._pyramid_registry.settings
169 return self._pyramid_registry.settings
163
170
164 def do_request(self, req, status=None, expect_errors=None):
171 def do_request(self, req, status=None, expect_errors=None):
165 # you can put custom code here
172 # you can put custom code here
166 return super().do_request(req, status, expect_errors)
173 return super().do_request(req, status, expect_errors)
167
174
168
175
169 def set_anonymous_access(enabled):
176 def set_anonymous_access(enabled):
170 """(Dis)allows anonymous access depending on parameter `enabled`"""
177 """(Dis)allows anonymous access depending on parameter `enabled`"""
171 user = User.get_default_user()
178 user = User.get_default_user()
172 user.active = enabled
179 user.active = enabled
173 Session().add(user)
180 Session().add(user)
174 Session().commit()
181 Session().commit()
175 time.sleep(1.5) # must sleep for cache (1s to expire)
182 time.sleep(1.5) # must sleep for cache (1s to expire)
176 log.info('anonymous access is now: %s', enabled)
183 log.info('anonymous access is now: %s', enabled)
177 assert enabled == User.get_default_user().active, (
184 assert enabled == User.get_default_user().active, (
178 'Cannot set anonymous access')
185 'Cannot set anonymous access')
179
186
180
187
181 def check_xfail_backends(node, backend_alias):
188 def check_xfail_backends(node, backend_alias):
182 # Using "xfail_backends" here intentionally, since this marks work
189 # Using "xfail_backends" here intentionally, since this marks work
183 # which is "to be done" soon.
190 # which is "to be done" soon.
184 skip_marker = node.get_closest_marker('xfail_backends')
191 skip_marker = node.get_closest_marker('xfail_backends')
185 if skip_marker and backend_alias in skip_marker.args:
192 if skip_marker and backend_alias in skip_marker.args:
186 msg = "Support for backend %s to be developed." % (backend_alias, )
193 msg = "Support for backend %s to be developed." % (backend_alias, )
187 msg = skip_marker.kwargs.get('reason', msg)
194 msg = skip_marker.kwargs.get('reason', msg)
188 pytest.xfail(msg)
195 pytest.xfail(msg)
189
196
190
197
191 def check_skip_backends(node, backend_alias):
198 def check_skip_backends(node, backend_alias):
192 # Using "skip_backends" here intentionally, since this marks work which is
199 # Using "skip_backends" here intentionally, since this marks work which is
193 # not supported.
200 # not supported.
194 skip_marker = node.get_closest_marker('skip_backends')
201 skip_marker = node.get_closest_marker('skip_backends')
195 if skip_marker and backend_alias in skip_marker.args:
202 if skip_marker and backend_alias in skip_marker.args:
196 msg = "Feature not supported for backend %s." % (backend_alias, )
203 msg = "Feature not supported for backend %s." % (backend_alias, )
197 msg = skip_marker.kwargs.get('reason', msg)
204 msg = skip_marker.kwargs.get('reason', msg)
198 pytest.skip(msg)
205 pytest.skip(msg)
199
206
200
207
201 def extract_git_repo_from_dump(dump_name, repo_name):
208 def extract_git_repo_from_dump(dump_name, repo_name):
202 """Create git repo `repo_name` from dump `dump_name`."""
209 """Create git repo `repo_name` from dump `dump_name`."""
203 repos_path = ScmModel().repos_path
210 repos_path = ScmModel().repos_path
204 target_path = os.path.join(repos_path, repo_name)
211 target_path = os.path.join(repos_path, repo_name)
205 rc_testdata.extract_git_dump(dump_name, target_path)
212 rc_testdata.extract_git_dump(dump_name, target_path)
206 return target_path
213 return target_path
207
214
208
215
209 def extract_hg_repo_from_dump(dump_name, repo_name):
216 def extract_hg_repo_from_dump(dump_name, repo_name):
210 """Create hg repo `repo_name` from dump `dump_name`."""
217 """Create hg repo `repo_name` from dump `dump_name`."""
211 repos_path = ScmModel().repos_path
218 repos_path = ScmModel().repos_path
212 target_path = os.path.join(repos_path, repo_name)
219 target_path = os.path.join(repos_path, repo_name)
213 rc_testdata.extract_hg_dump(dump_name, target_path)
220 rc_testdata.extract_hg_dump(dump_name, target_path)
214 return target_path
221 return target_path
215
222
216
223
217 def extract_svn_repo_from_dump(dump_name, repo_name):
224 def extract_svn_repo_from_dump(dump_name, repo_name):
218 """Create a svn repo `repo_name` from dump `dump_name`."""
225 """Create a svn repo `repo_name` from dump `dump_name`."""
219 repos_path = ScmModel().repos_path
226 repos_path = ScmModel().repos_path
220 target_path = os.path.join(repos_path, repo_name)
227 target_path = os.path.join(repos_path, repo_name)
221 SubversionRepository(target_path, create=True)
228 SubversionRepository(target_path, create=True)
222 _load_svn_dump_into_repo(dump_name, target_path)
229 _load_svn_dump_into_repo(dump_name, target_path)
223 return target_path
230 return target_path
224
231
225
232
226 def assert_message_in_log(log_records, message, levelno, module):
233 def assert_message_in_log(log_records, message, levelno, module):
227 messages = [
234 messages = [
228 r.message for r in log_records
235 r.message for r in log_records
229 if r.module == module and r.levelno == levelno
236 if r.module == module and r.levelno == levelno
230 ]
237 ]
231 assert message in messages
238 assert message in messages
232
239
233
240
234 def _load_svn_dump_into_repo(dump_name, repo_path):
241 def _load_svn_dump_into_repo(dump_name, repo_path):
235 """
242 """
236 Utility to populate a svn repository with a named dump
243 Utility to populate a svn repository with a named dump
237
244
238 Currently the dumps are in rc_testdata. They might later on be
245 Currently the dumps are in rc_testdata. They might later on be
239 integrated with the main repository once they stabilize more.
246 integrated with the main repository once they stabilize more.
240 """
247 """
241 dump = rc_testdata.load_svn_dump(dump_name)
248 dump = rc_testdata.load_svn_dump(dump_name)
242 load_dump = subprocess.Popen(
249 load_dump = subprocess.Popen(
243 ['svnadmin', 'load', repo_path],
250 ['svnadmin', 'load', repo_path],
244 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
251 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
245 stderr=subprocess.PIPE)
252 stderr=subprocess.PIPE)
246 out, err = load_dump.communicate(dump)
253 out, err = load_dump.communicate(dump)
247 if load_dump.returncode != 0:
254 if load_dump.returncode != 0:
248 log.error("Output of load_dump command: %s", out)
255 log.error("Output of load_dump command: %s", out)
249 log.error("Error output of load_dump command: %s", err)
256 log.error("Error output of load_dump command: %s", err)
250 raise Exception(
257 raise Exception(
251 'Failed to load dump "%s" into repository at path "%s".'
258 'Failed to load dump "%s" into repository at path "%s".'
252 % (dump_name, repo_path))
259 % (dump_name, repo_path))
253
260
254
261
255 class AssertResponse(object):
262 class AssertResponse(object):
256 """
263 """
257 Utility that helps to assert things about a given HTML response.
264 Utility that helps to assert things about a given HTML response.
258 """
265 """
259
266
260 def __init__(self, response):
267 def __init__(self, response):
261 self.response = response
268 self.response = response
262
269
263 def get_imports(self):
270 def get_imports(self):
264 return fromstring, tostring, CSSSelector
271 return fromstring, tostring, CSSSelector
265
272
266 def one_element_exists(self, css_selector):
273 def one_element_exists(self, css_selector):
267 self.get_element(css_selector)
274 self.get_element(css_selector)
268
275
269 def no_element_exists(self, css_selector):
276 def no_element_exists(self, css_selector):
270 assert not self._get_elements(css_selector)
277 assert not self._get_elements(css_selector)
271
278
272 def element_equals_to(self, css_selector, expected_content):
279 def element_equals_to(self, css_selector, expected_content):
273 element = self.get_element(css_selector)
280 element = self.get_element(css_selector)
274 element_text = self._element_to_string(element)
281 element_text = self._element_to_string(element)
275
282
276 assert expected_content in element_text
283 assert expected_content in element_text
277
284
278 def element_contains(self, css_selector, expected_content):
285 def element_contains(self, css_selector, expected_content):
279 element = self.get_element(css_selector)
286 element = self.get_element(css_selector)
280 assert expected_content in element.text_content()
287 assert expected_content in element.text_content()
281
288
282 def element_value_contains(self, css_selector, expected_content):
289 def element_value_contains(self, css_selector, expected_content):
283 element = self.get_element(css_selector)
290 element = self.get_element(css_selector)
284 assert expected_content in element.value
291 assert expected_content in element.value
285
292
286 def contains_one_link(self, link_text, href):
293 def contains_one_link(self, link_text, href):
287 fromstring, tostring, CSSSelector = self.get_imports()
294 fromstring, tostring, CSSSelector = self.get_imports()
288 doc = fromstring(self.response.body)
295 doc = fromstring(self.response.body)
289 sel = CSSSelector('a[href]')
296 sel = CSSSelector('a[href]')
290 elements = [
297 elements = [
291 e for e in sel(doc) if e.text_content().strip() == link_text]
298 e for e in sel(doc) if e.text_content().strip() == link_text]
292 assert len(elements) == 1, "Did not find link or found multiple links"
299 assert len(elements) == 1, "Did not find link or found multiple links"
293 self._ensure_url_equal(elements[0].attrib.get('href'), href)
300 self._ensure_url_equal(elements[0].attrib.get('href'), href)
294
301
295 def contains_one_anchor(self, anchor_id):
302 def contains_one_anchor(self, anchor_id):
296 fromstring, tostring, CSSSelector = self.get_imports()
303 fromstring, tostring, CSSSelector = self.get_imports()
297 doc = fromstring(self.response.body)
304 doc = fromstring(self.response.body)
298 sel = CSSSelector('#' + anchor_id)
305 sel = CSSSelector('#' + anchor_id)
299 elements = sel(doc)
306 elements = sel(doc)
300 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
307 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
301
308
302 def _ensure_url_equal(self, found, expected):
309 def _ensure_url_equal(self, found, expected):
303 assert _Url(found) == _Url(expected)
310 assert _Url(found) == _Url(expected)
304
311
305 def get_element(self, css_selector):
312 def get_element(self, css_selector):
306 elements = self._get_elements(css_selector)
313 elements = self._get_elements(css_selector)
307 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
314 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
308 return elements[0]
315 return elements[0]
309
316
310 def get_elements(self, css_selector):
317 def get_elements(self, css_selector):
311 return self._get_elements(css_selector)
318 return self._get_elements(css_selector)
312
319
313 def _get_elements(self, css_selector):
320 def _get_elements(self, css_selector):
314 fromstring, tostring, CSSSelector = self.get_imports()
321 fromstring, tostring, CSSSelector = self.get_imports()
315 doc = fromstring(self.response.body)
322 doc = fromstring(self.response.body)
316 sel = CSSSelector(css_selector)
323 sel = CSSSelector(css_selector)
317 elements = sel(doc)
324 elements = sel(doc)
318 return elements
325 return elements
319
326
320 def _element_to_string(self, element):
327 def _element_to_string(self, element):
321 fromstring, tostring, CSSSelector = self.get_imports()
328 fromstring, tostring, CSSSelector = self.get_imports()
322 return tostring(element, encoding='unicode')
329 return tostring(element, encoding='unicode')
323
330
324
331
325 class _Url(object):
332 class _Url(object):
326 """
333 """
327 A url object that can be compared with other url orbjects
334 A url object that can be compared with other url orbjects
328 without regard to the vagaries of encoding, escaping, and ordering
335 without regard to the vagaries of encoding, escaping, and ordering
329 of parameters in query strings.
336 of parameters in query strings.
330
337
331 Inspired by
338 Inspired by
332 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
339 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
333 """
340 """
334
341
335 def __init__(self, url):
342 def __init__(self, url):
336 parts = urllib.parse.urlparse(url)
343 parts = urllib.parse.urlparse(url)
337 _query = frozenset(urllib.parse.parse_qsl(parts.query))
344 _query = frozenset(urllib.parse.parse_qsl(parts.query))
338 _path = unquote_plus(parts.path)
345 _path = unquote_plus(parts.path)
339 parts = parts._replace(query=_query, path=_path)
346 parts = parts._replace(query=_query, path=_path)
340 self.parts = parts
347 self.parts = parts
341
348
342 def __eq__(self, other):
349 def __eq__(self, other):
343 return self.parts == other.parts
350 return self.parts == other.parts
344
351
345 def __hash__(self):
352 def __hash__(self):
346 return hash(self.parts)
353 return hash(self.parts)
347
354
348
355
349 def run_test_concurrently(times, raise_catched_exc=True):
356 def run_test_concurrently(times, raise_catched_exc=True):
350 """
357 """
351 Add this decorator to small pieces of code that you want to test
358 Add this decorator to small pieces of code that you want to test
352 concurrently
359 concurrently
353
360
354 ex:
361 ex:
355
362
356 @test_concurrently(25)
363 @test_concurrently(25)
357 def my_test_function():
364 def my_test_function():
358 ...
365 ...
359 """
366 """
360 def test_concurrently_decorator(test_func):
367 def test_concurrently_decorator(test_func):
361 def wrapper(*args, **kwargs):
368 def wrapper(*args, **kwargs):
362 exceptions = []
369 exceptions = []
363
370
364 def call_test_func():
371 def call_test_func():
365 try:
372 try:
366 test_func(*args, **kwargs)
373 test_func(*args, **kwargs)
367 except Exception as e:
374 except Exception as e:
368 exceptions.append(e)
375 exceptions.append(e)
369 if raise_catched_exc:
376 if raise_catched_exc:
370 raise
377 raise
371 threads = []
378 threads = []
372 for i in range(times):
379 for i in range(times):
373 threads.append(threading.Thread(target=call_test_func))
380 threads.append(threading.Thread(target=call_test_func))
374 for t in threads:
381 for t in threads:
375 t.start()
382 t.start()
376 for t in threads:
383 for t in threads:
377 t.join()
384 t.join()
378 if exceptions:
385 if exceptions:
379 raise Exception(
386 raise Exception(
380 'test_concurrently intercepted %s exceptions: %s' % (
387 'test_concurrently intercepted %s exceptions: %s' % (
381 len(exceptions), exceptions))
388 len(exceptions), exceptions))
382 return wrapper
389 return wrapper
383 return test_concurrently_decorator
390 return test_concurrently_decorator
384
391
385
392
386 def wait_for_url(url, timeout=10):
393 def wait_for_url(url, timeout=10):
387 """
394 """
388 Wait until URL becomes reachable.
395 Wait until URL becomes reachable.
389
396
390 It polls the URL until the timeout is reached or it became reachable.
397 It polls the URL until the timeout is reached or it became reachable.
391 If will call to `py.test.fail` in case the URL is not reachable.
398 If will call to `py.test.fail` in case the URL is not reachable.
392 """
399 """
393 timeout = time.time() + timeout
400 timeout = time.time() + timeout
394 last = 0
401 last = 0
395 wait = 0.1
402 wait = 0.1
396
403
397 while timeout > last:
404 while timeout > last:
398 last = time.time()
405 last = time.time()
399 if is_url_reachable(url, log_exc=False):
406 if is_url_reachable(url, log_exc=False):
400 break
407 break
401 elif (last + wait) > time.time():
408 elif (last + wait) > time.time():
402 # Go to sleep because not enough time has passed since last check.
409 # Go to sleep because not enough time has passed since last check.
403 time.sleep(wait)
410 time.sleep(wait)
404 else:
411 else:
405 pytest.fail(f"Timeout while waiting for URL {url}")
412 pytest.fail(f"Timeout while waiting for URL {url}")
406
413
407
414
408 def is_url_reachable(url: str, log_exc: bool = False) -> bool:
415 def is_url_reachable(url: str, log_exc: bool = False) -> bool:
409 try:
416 try:
410 urllib.request.urlopen(url)
417 urllib.request.urlopen(url)
411 except urllib.error.URLError:
418 except urllib.error.URLError:
412 if log_exc:
419 if log_exc:
413 log.exception(f'URL `{url}` reach error')
420 log.exception(f'URL `{url}` reach error')
414 return False
421 return False
415 return True
422 return True
416
423
417
424
418 def repo_on_filesystem(repo_name):
425 def repo_on_filesystem(repo_name):
419 from rhodecode.lib import vcs
426 from rhodecode.lib import vcs
420 from rhodecode.tests import TESTS_TMP_PATH
427 from rhodecode.tests import TESTS_TMP_PATH
421 repo = vcs.get_vcs_instance(
428 repo = vcs.get_vcs_instance(
422 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
429 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
423 return repo is not None
430 return repo is not None
424
431
425
432
426 def commit_change(
433 def commit_change(
427 repo, filename: bytes, content: bytes, message, vcs_type, parent=None, newfile=False):
434 repo, filename: bytes, content: bytes, message, vcs_type, parent=None, newfile=False):
428 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
435 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
429
436
430 repo = Repository.get_by_repo_name(repo)
437 repo = Repository.get_by_repo_name(repo)
431 _commit = parent
438 _commit = parent
432 if not parent:
439 if not parent:
433 _commit = EmptyCommit(alias=vcs_type)
440 _commit = EmptyCommit(alias=vcs_type)
434
441
435 if newfile:
442 if newfile:
436 nodes = {
443 nodes = {
437 filename: {
444 filename: {
438 'content': content
445 'content': content
439 }
446 }
440 }
447 }
441 commit = ScmModel().create_nodes(
448 commit = ScmModel().create_nodes(
442 user=TEST_USER_ADMIN_LOGIN, repo=repo,
449 user=TEST_USER_ADMIN_LOGIN, repo=repo,
443 message=message,
450 message=message,
444 nodes=nodes,
451 nodes=nodes,
445 parent_commit=_commit,
452 parent_commit=_commit,
446 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
453 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
447 )
454 )
448 else:
455 else:
449 commit = ScmModel().commit_change(
456 commit = ScmModel().commit_change(
450 repo=repo.scm_instance(), repo_name=repo.repo_name,
457 repo=repo.scm_instance(), repo_name=repo.repo_name,
451 commit=parent, user=TEST_USER_ADMIN_LOGIN,
458 commit=parent, user=TEST_USER_ADMIN_LOGIN,
452 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
459 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
453 message=message,
460 message=message,
454 content=content,
461 content=content,
455 f_path=filename
462 f_path=filename
456 )
463 )
457 return commit
464 return commit
458
465
459
466
460 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
467 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
461 if not default:
468 if not default:
462 raise ValueError('Permission for default user must be given')
469 raise ValueError('Permission for default user must be given')
463 form_data = [(
470 form_data = [(
464 'csrf_token', csrf_token
471 'csrf_token', csrf_token
465 )]
472 )]
466 # add default
473 # add default
467 form_data.extend([
474 form_data.extend([
468 ('u_perm_1', default)
475 ('u_perm_1', default)
469 ])
476 ])
470
477
471 if grant:
478 if grant:
472 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
479 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
473 form_data.extend([
480 form_data.extend([
474 ('perm_new_member_perm_new{}'.format(cnt), perm),
481 ('perm_new_member_perm_new{}'.format(cnt), perm),
475 ('perm_new_member_id_new{}'.format(cnt), obj_id),
482 ('perm_new_member_id_new{}'.format(cnt), obj_id),
476 ('perm_new_member_name_new{}'.format(cnt), obj_name),
483 ('perm_new_member_name_new{}'.format(cnt), obj_name),
477 ('perm_new_member_type_new{}'.format(cnt), obj_type),
484 ('perm_new_member_type_new{}'.format(cnt), obj_type),
478
485
479 ])
486 ])
480 if revoke:
487 if revoke:
481 for obj_id, obj_type in revoke:
488 for obj_id, obj_type in revoke:
482 form_data.extend([
489 form_data.extend([
483 ('perm_del_member_id_{}'.format(obj_id), obj_id),
490 ('perm_del_member_id_{}'.format(obj_id), obj_id),
484 ('perm_del_member_type_{}'.format(obj_id), obj_type),
491 ('perm_del_member_type_{}'.format(obj_id), obj_type),
485 ])
492 ])
486 return form_data
493 return form_data
General Comments 0
You need to be logged in to leave comments. Login now