##// END OF EJS Templates
fix(celery): remove warning about retry connection flag
fix(celery): remove warning about retry connection flag

File last commit:

r5198:919dd05c default
r5291:c778c694 default
Show More
utils.py
494 lines | 16.0 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2010-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import threading
import time
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 import sys
project: added all source files and assets
r1 import logging
import os.path
python3: remove usage of subprocess32
r4926 import subprocess
tests: use custom test app for better debug of .mustcontain function....
r1256 import tempfile
tests: multiple tests cases fixes for python3
r4994 import urllib.request
import urllib.error
import urllib.parse
dependencies: bumped lxml and made it a dependency.
r1525 from lxml.html import fromstring, tostring
from lxml.cssselect import CSSSelector
python3: fix urllib usage
r4914 from urllib.parse import unquote_plus
dependencies: bumped pyramid to 1.9 webob to 1.7.3 and webtest to 2.0.27...
r1906 import webob
project: added all source files and assets
r1
python3: fixed urlparse import
r4919 from webtest.app import TestResponse, TestApp
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190
tests: use custom test app for better debug of .mustcontain function....
r1256
project: added all source files and assets
r1 import pytest
pytest: restructure fixtures/plugins to fix problems with pkg_resources and Can't perform this operation for unregistered loader type errors....
r4986
try:
import rc_testdata
except ImportError:
raise ImportError('Failed to import rc_testdata, '
'please make sure this package is installed from requirements_test.txt')
project: added all source files and assets
r1
tests: use custom test app for better debug of .mustcontain function....
r1256 from rhodecode.model.db import User, Repository
project: added all source files and assets
r1 from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
diffs: compare overhaul....
r1259 from rhodecode.lib.vcs.backends.base import EmptyCommit
testapp: moved login/csrf session methods into TestApp itself....
r2374 from rhodecode.tests import login_user_session
project: added all source files and assets
r1
log = logging.getLogger(__name__)
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 def print_to_func(value, print_to=sys.stderr):
print(value, file=print_to)
tests: use custom test app for better debug of .mustcontain function....
r1256 class CustomTestResponse(TestResponse):
tests: fixed compare page and related tests....
r3773
tests: use custom test app for better debug of .mustcontain function....
r1256 def _save_output(self, out):
tests: multiple tests cases fixes for python3
r4994 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
tests: use custom test app for better debug of .mustcontain function....
r1256 f.write(out)
return f.name
def mustcontain(self, *strings, **kw):
"""
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 Assert that the response contains all the strings passed
tests: use custom test app for better debug of .mustcontain function....
r1256 in as arguments.
Equivalent to::
assert string in res
"""
tests: fixed compare page and related tests....
r3773 print_body = kw.pop('print_body', False)
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to = kw.pop('print_to', sys.stderr)
tests: use custom test app for better debug of .mustcontain function....
r1256 if 'no' in kw:
no = kw['no']
del kw['no']
python3: fixed urlparse import
r4919 if isinstance(no, str):
tests: use custom test app for better debug of .mustcontain function....
r1256 no = [no]
else:
no = []
if kw:
tests: fixed all tests for python3 BIG changes
r5087 raise TypeError(f"The only keyword argument allowed is 'no' got {kw}")
tests: use custom test app for better debug of .mustcontain function....
r1256
f = self._save_output(str(self))
for s in strings:
tests: multiple tests cases fixes for python3
r4994 if s not in self:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(f"Actual response (no {s!r}):", print_to=print_to)
print_to_func(f"body output saved as `{f}`", print_to=print_to)
tests: fixed compare page and related tests....
r3773 if print_body:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(str(self), print_to=print_to)
tests: fixed all tests for python3 BIG changes
r5087 raise IndexError(f"Body does not contain string {s!r}, body output saved as {f}")
tests: use custom test app for better debug of .mustcontain function....
r1256
for no_s in no:
if no_s in self:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(f"Actual response (has {no_s!r})", print_to=print_to)
print_to_func(f"body output saved as `{f}`", print_to=print_to)
tests: fixed compare page and related tests....
r3773 if print_body:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(str(self), print_to=print_to)
tests: fixed all tests for python3 BIG changes
r5087 raise IndexError(f"Body contains bad string {no_s!r}, body output saved as {f}")
tests: use custom test app for better debug of .mustcontain function....
r1256
def assert_response(self):
return AssertResponse(self)
home: moved home and repo group views into pyramid....
r1774 def get_session_from_response(self):
"""
pylons: remove pylons as dependency...
r2351 This returns the session from a response object.
home: moved home and repo group views into pyramid....
r1774 """
tests: fixed pyramid_beaker import
r3765 from rhodecode.lib.rc_beaker import session_factory_from_settings
file-uploads: created simple upload capabilities....
r3432 session = session_factory_from_settings(self.test_app._pyramid_settings)
pylons: remove pylons as dependency...
r2351 return session(self.request)
home: moved home and repo group views into pyramid....
r1774
tests: use custom test app for better debug of .mustcontain function....
r1256
dependencies: bumped pyramid to 1.9 webob to 1.7.3 and webtest to 2.0.27...
r1906 class TestRequest(webob.BaseRequest):
tests: use custom test app for better debug of .mustcontain function....
r1256
tests: multiple tests cases fixes for python3
r4994 # for py.test, so it doesn't try to run this tas by name starting with test...
tests: use custom test app for better debug of .mustcontain function....
r1256 disabled = True
ResponseClass = CustomTestResponse
pylons: remove pylons as dependency...
r2351 def add_response_callback(self, callback):
pass
tests: multiple tests cases fixes for python3
r4994 @classmethod
def blank(cls, path, environ=None, base_url=None,
headers=None, POST=None, **kw):
if not path.isascii():
# our custom quote path if it contains non-ascii chars
path = urllib.parse.quote(path)
return super(TestRequest, cls).blank(
path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
tests: use custom test app for better debug of .mustcontain function....
r1256
class CustomTestApp(TestApp):
"""
spelling: fixed usefull -> useful
r3465 Custom app to make mustcontain more Useful, and extract special methods
tests: use custom test app for better debug of .mustcontain function....
r1256 """
RequestClass = TestRequest
testapp: moved login/csrf session methods into TestApp itself....
r2374 rc_login_data = {}
rc_current_session = None
def login(self, username=None, password=None):
from rhodecode.lib import auth
if username and password:
session = login_user_session(self, username, password)
else:
session = login_user_session(self)
self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
self.rc_current_session = session
return session['rhodecode_user']
@property
def csrf_token(self):
return self.rc_login_data['csrf_token']
tests: use custom test app for better debug of .mustcontain function....
r1256
file-uploads: created simple upload capabilities....
r3432 @property
def _pyramid_registry(self):
return self.app.config.registry
@property
def _pyramid_settings(self):
return self._pyramid_registry.settings
tests: fixed all tests for python3 BIG changes
r5087 def do_request(self, req, status=None, expect_errors=None):
# you can put custom code here
return super().do_request(req, status, expect_errors)
tests: use custom test app for better debug of .mustcontain function....
r1256
project: added all source files and assets
r1 def set_anonymous_access(enabled):
"""(Dis)allows anonymous access depending on parameter `enabled`"""
user = User.get_default_user()
user.active = enabled
Session().add(user)
Session().commit()
repo-summary: re-implemented summary view as pyramid....
r1785 time.sleep(1.5) # must sleep for cache (1s to expire)
project: added all source files and assets
r1 log.info('anonymous access is now: %s', enabled)
assert enabled == User.get_default_user().active, (
'Cannot set anonymous access')
def check_xfail_backends(node, backend_alias):
# Using "xfail_backends" here intentionally, since this marks work
# which is "to be done" soon.
dan
tests: fixed some pytest deprecated calls, and warnings.
r3098 skip_marker = node.get_closest_marker('xfail_backends')
project: added all source files and assets
r1 if skip_marker and backend_alias in skip_marker.args:
msg = "Support for backend %s to be developed." % (backend_alias, )
msg = skip_marker.kwargs.get('reason', msg)
pytest.xfail(msg)
def check_skip_backends(node, backend_alias):
# Using "skip_backends" here intentionally, since this marks work which is
# not supported.
dan
tests: fixed some pytest deprecated calls, and warnings.
r3098 skip_marker = node.get_closest_marker('skip_backends')
project: added all source files and assets
r1 if skip_marker and backend_alias in skip_marker.args:
msg = "Feature not supported for backend %s." % (backend_alias, )
msg = skip_marker.kwargs.get('reason', msg)
pytest.skip(msg)
def extract_git_repo_from_dump(dump_name, repo_name):
"""Create git repo `repo_name` from dump `dump_name`."""
repos_path = ScmModel().repos_path
target_path = os.path.join(repos_path, repo_name)
rc_testdata.extract_git_dump(dump_name, target_path)
return target_path
def extract_hg_repo_from_dump(dump_name, repo_name):
"""Create hg repo `repo_name` from dump `dump_name`."""
repos_path = ScmModel().repos_path
target_path = os.path.join(repos_path, repo_name)
rc_testdata.extract_hg_dump(dump_name, target_path)
return target_path
def extract_svn_repo_from_dump(dump_name, repo_name):
"""Create a svn repo `repo_name` from dump `dump_name`."""
repos_path = ScmModel().repos_path
target_path = os.path.join(repos_path, repo_name)
SubversionRepository(target_path, create=True)
_load_svn_dump_into_repo(dump_name, target_path)
return target_path
def assert_message_in_log(log_records, message, levelno, module):
messages = [
r.message for r in log_records
if r.module == module and r.levelno == levelno
]
assert message in messages
def _load_svn_dump_into_repo(dump_name, repo_path):
"""
Utility to populate a svn repository with a named dump
Currently the dumps are in rc_testdata. They might later on be
integrated with the main repository once they stabilize more.
"""
dump = rc_testdata.load_svn_dump(dump_name)
python3: remove usage of subprocess32
r4926 load_dump = subprocess.Popen(
project: added all source files and assets
r1 ['svnadmin', 'load', repo_path],
python3: remove usage of subprocess32
r4926 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
project: added all source files and assets
r1 out, err = load_dump.communicate(dump)
if load_dump.returncode != 0:
log.error("Output of load_dump command: %s", out)
log.error("Error output of load_dump command: %s", err)
raise Exception(
'Failed to load dump "%s" into repository at path "%s".'
% (dump_name, repo_path))
class AssertResponse(object):
"""
Utility that helps to assert things about a given HTML response.
"""
def __init__(self, response):
self.response = response
test: moved lxml to local imports since this only exists during tests.
r1239 def get_imports(self):
return fromstring, tostring, CSSSelector
project: added all source files and assets
r1 def one_element_exists(self, css_selector):
self.get_element(css_selector)
def no_element_exists(self, css_selector):
assert not self._get_elements(css_selector)
def element_equals_to(self, css_selector, expected_content):
element = self.get_element(css_selector)
element_text = self._element_to_string(element)
tests: multiple tests cases fixes for python3
r4994
project: added all source files and assets
r1 assert expected_content in element_text
def element_contains(self, css_selector, expected_content):
element = self.get_element(css_selector)
assert expected_content in element.text_content()
Martin Bornhold
tests: Add a method to AssertResponse to check value of elements.
r1046 def element_value_contains(self, css_selector, expected_content):
element = self.get_element(css_selector)
assert expected_content in element.value
project: added all source files and assets
r1 def contains_one_link(self, link_text, href):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
project: added all source files and assets
r1 doc = fromstring(self.response.body)
sel = CSSSelector('a[href]')
elements = [
e for e in sel(doc) if e.text_content().strip() == link_text]
assert len(elements) == 1, "Did not find link or found multiple links"
self._ensure_url_equal(elements[0].attrib.get('href'), href)
def contains_one_anchor(self, anchor_id):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
project: added all source files and assets
r1 doc = fromstring(self.response.body)
sel = CSSSelector('#' + anchor_id)
elements = sel(doc)
tests: improved test utils....
r1442 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
project: added all source files and assets
r1
def _ensure_url_equal(self, found, expected):
assert _Url(found) == _Url(expected)
def get_element(self, css_selector):
elements = self._get_elements(css_selector)
tests: improved test utils....
r1442 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
project: added all source files and assets
r1 return elements[0]
file-browser: refactor how we load metadata for file trees....
r423 def get_elements(self, css_selector):
return self._get_elements(css_selector)
project: added all source files and assets
r1 def _get_elements(self, css_selector):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
project: added all source files and assets
r1 doc = fromstring(self.response.body)
sel = CSSSelector(css_selector)
elements = sel(doc)
return elements
def _element_to_string(self, element):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
tests: multiple tests cases fixes for python3
r4994 return tostring(element, encoding='unicode')
project: added all source files and assets
r1
class _Url(object):
"""
A url object that can be compared with other url orbjects
without regard to the vagaries of encoding, escaping, and ordering
of parameters in query strings.
Inspired by
http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
"""
def __init__(self, url):
python3: fixed urlparse import
r4919 parts = urllib.parse.urlparse(url)
_query = frozenset(urllib.parse.parse_qsl(parts.query))
project: added all source files and assets
r1 _path = unquote_plus(parts.path)
parts = parts._replace(query=_query, path=_path)
self.parts = parts
def __eq__(self, other):
return self.parts == other.parts
def __hash__(self):
return hash(self.parts)
def run_test_concurrently(times, raise_catched_exc=True):
"""
Add this decorator to small pieces of code that you want to test
concurrently
ex:
@test_concurrently(25)
def my_test_function():
...
"""
def test_concurrently_decorator(test_func):
def wrapper(*args, **kwargs):
exceptions = []
def call_test_func():
try:
test_func(*args, **kwargs)
test: moved lxml to local imports since this only exists during tests.
r1239 except Exception as e:
project: added all source files and assets
r1 exceptions.append(e)
if raise_catched_exc:
raise
threads = []
for i in range(times):
threads.append(threading.Thread(target=call_test_func))
for t in threads:
t.start()
for t in threads:
t.join()
if exceptions:
raise Exception(
'test_concurrently intercepted %s exceptions: %s' % (
len(exceptions), exceptions))
return wrapper
return test_concurrently_decorator
def wait_for_url(url, timeout=10):
"""
Wait until URL becomes reachable.
It polls the URL until the timeout is reached or it became reachable.
If will call to `py.test.fail` in case the URL is not reachable.
"""
timeout = time.time() + timeout
last = 0
wait = 0.1
test: moved lxml to local imports since this only exists during tests.
r1239 while timeout > last:
project: added all source files and assets
r1 last = time.time()
tests: allow logging exceptions on url reach check
r5023 if is_url_reachable(url, log_exc=False):
project: added all source files and assets
r1 break
test: moved lxml to local imports since this only exists during tests.
r1239 elif (last + wait) > time.time():
project: added all source files and assets
r1 # Go to sleep because not enough time has passed since last check.
time.sleep(wait)
else:
tests: allow logging exceptions on url reach check
r5023 pytest.fail(f"Timeout while waiting for URL {url}")
project: added all source files and assets
r1
tests: fixed all tests for python3 BIG changes
r5087 def is_url_reachable(url: str, log_exc: bool = False) -> bool:
project: added all source files and assets
r1 try:
python3: fix urllib usage
r4914 urllib.request.urlopen(url)
except urllib.error.URLError:
tests: allow logging exceptions on url reach check
r5023 if log_exc:
tests: fixed all tests for python3 BIG changes
r5087 log.exception(f'URL `{url}` reach error')
project: added all source files and assets
r1 return False
return True
pytest: Fix the user login function....
r41
Martin Bornhold
tests: Fix a helper method that checks if a repository is available on filesystem....
r486 def repo_on_filesystem(repo_name):
from rhodecode.lib import vcs
from rhodecode.tests import TESTS_TMP_PATH
repo = vcs.get_vcs_instance(
os.path.join(TESTS_TMP_PATH, repo_name), create=False)
return repo is not None
diffs: compare overhaul....
r1259
def commit_change(
fix(tests): fixed the creation of non-linear commits creation in tests...
r5198 repo, filename: bytes, content: bytes, message, vcs_type, parent=None, branch=None, newfile=False):
diffs: compare overhaul....
r1259 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
repo = Repository.get_by_repo_name(repo)
_commit = parent
if not parent:
_commit = EmptyCommit(alias=vcs_type)
if newfile:
nodes = {
filename: {
'content': content
}
}
commit = ScmModel().create_nodes(
user=TEST_USER_ADMIN_LOGIN, repo=repo,
message=message,
nodes=nodes,
parent_commit=_commit,
pytest: restructure fixtures/plugins to fix problems with pkg_resources and Can't perform this operation for unregistered loader type errors....
r4986 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
diffs: compare overhaul....
r1259 )
else:
commit = ScmModel().commit_change(
repo=repo.scm_instance(), repo_name=repo.repo_name,
commit=parent, user=TEST_USER_ADMIN_LOGIN,
pytest: restructure fixtures/plugins to fix problems with pkg_resources and Can't perform this operation for unregistered loader type errors....
r4986 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
diffs: compare overhaul....
r1259 message=message,
content=content,
fix(tests): fixed the creation of non-linear commits creation in tests...
r5198 f_path=filename,
branch=branch
diffs: compare overhaul....
r1259 )
return commit
tests: added tests for permission update views to catch obvious form errors.
r2827
def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
if not default:
raise ValueError('Permission for default user must be given')
form_data = [(
'csrf_token', csrf_token
)]
# add default
form_data.extend([
('u_perm_1', default)
])
if grant:
for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
form_data.extend([
('perm_new_member_perm_new{}'.format(cnt), perm),
('perm_new_member_id_new{}'.format(cnt), obj_id),
('perm_new_member_name_new{}'.format(cnt), obj_name),
('perm_new_member_type_new{}'.format(cnt), obj_type),
])
if revoke:
for obj_id, obj_type in revoke:
form_data.extend([
('perm_del_member_id_{}'.format(obj_id), obj_id),
('perm_del_member_type_{}'.format(obj_id), obj_type),
])
return form_data