##// END OF EJS Templates
fix(caching): fixed problems with Cache query for users....
fix(caching): fixed problems with Cache query for users. The old way of querying caused the user get query to be always cached, and returning old results even in 2fa forms. The new limited query doesn't cache the user object resolving issues

File last commit:

r5198:919dd05c default
r5365:ae8a165b default
Show More
utils.py
494 lines | 16.0 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2010-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import threading
import time
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 import sys
project: added all source files and assets
r1 import logging
import os.path
python3: remove usage of subprocess32
r4926 import subprocess
tests: use custom test app for better debug of .mustcontain function....
r1256 import tempfile
tests: multiple tests cases fixes for python3
r4994 import urllib.request
import urllib.error
import urllib.parse
dependencies: bumped lxml and made it a dependency.
r1525 from lxml.html import fromstring, tostring
from lxml.cssselect import CSSSelector
python3: fix urllib usage
r4914 from urllib.parse import unquote_plus
dependencies: bumped pyramid to 1.9 webob to 1.7.3 and webtest to 2.0.27...
r1906 import webob
project: added all source files and assets
r1
python3: fixed urlparse import
r4919 from webtest.app import TestResponse, TestApp
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190
tests: use custom test app for better debug of .mustcontain function....
r1256
project: added all source files and assets
r1 import pytest
pytest: restructure fixtures/plugins to fix problems with pkg_resources and Can't perform this operation for unregistered loader type errors....
r4986
try:
import rc_testdata
except ImportError:
raise ImportError('Failed to import rc_testdata, '
'please make sure this package is installed from requirements_test.txt')
project: added all source files and assets
r1
tests: use custom test app for better debug of .mustcontain function....
r1256 from rhodecode.model.db import User, Repository
project: added all source files and assets
r1 from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
diffs: compare overhaul....
r1259 from rhodecode.lib.vcs.backends.base import EmptyCommit
testapp: moved login/csrf session methods into TestApp itself....
r2374 from rhodecode.tests import login_user_session
project: added all source files and assets
r1
log = logging.getLogger(__name__)
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 def print_to_func(value, print_to=sys.stderr):
print(value, file=print_to)
tests: use custom test app for better debug of .mustcontain function....
r1256 class CustomTestResponse(TestResponse):
tests: fixed compare page and related tests....
r3773
tests: use custom test app for better debug of .mustcontain function....
r1256 def _save_output(self, out):
tests: multiple tests cases fixes for python3
r4994 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
tests: use custom test app for better debug of .mustcontain function....
r1256 f.write(out)
return f.name
def mustcontain(self, *strings, **kw):
"""
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 Assert that the response contains all the strings passed
tests: use custom test app for better debug of .mustcontain function....
r1256 in as arguments.
Equivalent to::
assert string in res
"""
tests: fixed compare page and related tests....
r3773 print_body = kw.pop('print_body', False)
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to = kw.pop('print_to', sys.stderr)
tests: use custom test app for better debug of .mustcontain function....
r1256 if 'no' in kw:
no = kw['no']
del kw['no']
python3: fixed urlparse import
r4919 if isinstance(no, str):
tests: use custom test app for better debug of .mustcontain function....
r1256 no = [no]
else:
no = []
if kw:
tests: fixed all tests for python3 BIG changes
r5087 raise TypeError(f"The only keyword argument allowed is 'no' got {kw}")
tests: use custom test app for better debug of .mustcontain function....
r1256
f = self._save_output(str(self))
for s in strings:
tests: multiple tests cases fixes for python3
r4994 if s not in self:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(f"Actual response (no {s!r}):", print_to=print_to)
print_to_func(f"body output saved as `{f}`", print_to=print_to)
tests: fixed compare page and related tests....
r3773 if print_body:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(str(self), print_to=print_to)
tests: fixed all tests for python3 BIG changes
r5087 raise IndexError(f"Body does not contain string {s!r}, body output saved as {f}")
tests: use custom test app for better debug of .mustcontain function....
r1256
for no_s in no:
if no_s in self:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(f"Actual response (has {no_s!r})", print_to=print_to)
print_to_func(f"body output saved as `{f}`", print_to=print_to)
tests: fixed compare page and related tests....
r3773 if print_body:
chore(tests): add more flexibility to where we want to print mustcontain output for testing response
r5190 print_to_func(str(self), print_to=print_to)
tests: fixed all tests for python3 BIG changes
r5087 raise IndexError(f"Body contains bad string {no_s!r}, body output saved as {f}")
tests: use custom test app for better debug of .mustcontain function....
r1256
def assert_response(self):
return AssertResponse(self)
home: moved home and repo group views into pyramid....
r1774 def get_session_from_response(self):
"""
pylons: remove pylons as dependency...
r2351 This returns the session from a response object.
home: moved home and repo group views into pyramid....
r1774 """
tests: fixed pyramid_beaker import
r3765 from rhodecode.lib.rc_beaker import session_factory_from_settings
file-uploads: created simple upload capabilities....
r3432 session = session_factory_from_settings(self.test_app._pyramid_settings)
pylons: remove pylons as dependency...
r2351 return session(self.request)
home: moved home and repo group views into pyramid....
r1774
tests: use custom test app for better debug of .mustcontain function....
r1256
dependencies: bumped pyramid to 1.9 webob to 1.7.3 and webtest to 2.0.27...
r1906 class TestRequest(webob.BaseRequest):
tests: use custom test app for better debug of .mustcontain function....
r1256
tests: multiple tests cases fixes for python3
r4994 # for py.test, so it doesn't try to run this tas by name starting with test...
tests: use custom test app for better debug of .mustcontain function....
r1256 disabled = True
ResponseClass = CustomTestResponse
pylons: remove pylons as dependency...
r2351 def add_response_callback(self, callback):
pass
tests: multiple tests cases fixes for python3
r4994 @classmethod
def blank(cls, path, environ=None, base_url=None,
headers=None, POST=None, **kw):
if not path.isascii():
# our custom quote path if it contains non-ascii chars
path = urllib.parse.quote(path)
return super(TestRequest, cls).blank(
path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
tests: use custom test app for better debug of .mustcontain function....
r1256
class CustomTestApp(TestApp):
"""
spelling: fixed usefull -> useful
r3465 Custom app to make mustcontain more Useful, and extract special methods
tests: use custom test app for better debug of .mustcontain function....
r1256 """
RequestClass = TestRequest
testapp: moved login/csrf session methods into TestApp itself....
r2374 rc_login_data = {}
rc_current_session = None
def login(self, username=None, password=None):
from rhodecode.lib import auth
if username and password:
session = login_user_session(self, username, password)
else:
session = login_user_session(self)
self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
self.rc_current_session = session
return session['rhodecode_user']
@property
def csrf_token(self):
return self.rc_login_data['csrf_token']
tests: use custom test app for better debug of .mustcontain function....
r1256
file-uploads: created simple upload capabilities....
r3432 @property
def _pyramid_registry(self):
return self.app.config.registry
@property
def _pyramid_settings(self):
return self._pyramid_registry.settings
tests: fixed all tests for python3 BIG changes
r5087 def do_request(self, req, status=None, expect_errors=None):
# you can put custom code here
return super().do_request(req, status, expect_errors)
tests: use custom test app for better debug of .mustcontain function....
r1256
project: added all source files and assets
r1 def set_anonymous_access(enabled):
"""(Dis)allows anonymous access depending on parameter `enabled`"""
user = User.get_default_user()
user.active = enabled
Session().add(user)
Session().commit()
repo-summary: re-implemented summary view as pyramid....
r1785 time.sleep(1.5) # must sleep for cache (1s to expire)
project: added all source files and assets
r1 log.info('anonymous access is now: %s', enabled)
assert enabled == User.get_default_user().active, (
'Cannot set anonymous access')
def check_xfail_backends(node, backend_alias):
# Using "xfail_backends" here intentionally, since this marks work
# which is "to be done" soon.
dan
tests: fixed some pytest deprecated calls, and warnings.
r3098 skip_marker = node.get_closest_marker('xfail_backends')
project: added all source files and assets
r1 if skip_marker and backend_alias in skip_marker.args:
msg = "Support for backend %s to be developed." % (backend_alias, )
msg = skip_marker.kwargs.get('reason', msg)
pytest.xfail(msg)
def check_skip_backends(node, backend_alias):
# Using "skip_backends" here intentionally, since this marks work which is
# not supported.
dan
tests: fixed some pytest deprecated calls, and warnings.
r3098 skip_marker = node.get_closest_marker('skip_backends')
project: added all source files and assets
r1 if skip_marker and backend_alias in skip_marker.args:
msg = "Feature not supported for backend %s." % (backend_alias, )
msg = skip_marker.kwargs.get('reason', msg)
pytest.skip(msg)
def extract_git_repo_from_dump(dump_name, repo_name):
"""Create git repo `repo_name` from dump `dump_name`."""
repos_path = ScmModel().repos_path
target_path = os.path.join(repos_path, repo_name)
rc_testdata.extract_git_dump(dump_name, target_path)
return target_path
def extract_hg_repo_from_dump(dump_name, repo_name):
"""Create hg repo `repo_name` from dump `dump_name`."""
repos_path = ScmModel().repos_path
target_path = os.path.join(repos_path, repo_name)
rc_testdata.extract_hg_dump(dump_name, target_path)
return target_path
def extract_svn_repo_from_dump(dump_name, repo_name):
"""Create a svn repo `repo_name` from dump `dump_name`."""
repos_path = ScmModel().repos_path
target_path = os.path.join(repos_path, repo_name)
SubversionRepository(target_path, create=True)
_load_svn_dump_into_repo(dump_name, target_path)
return target_path
def assert_message_in_log(log_records, message, levelno, module):
messages = [
r.message for r in log_records
if r.module == module and r.levelno == levelno
]
assert message in messages
def _load_svn_dump_into_repo(dump_name, repo_path):
"""
Utility to populate a svn repository with a named dump
Currently the dumps are in rc_testdata. They might later on be
integrated with the main repository once they stabilize more.
"""
dump = rc_testdata.load_svn_dump(dump_name)
python3: remove usage of subprocess32
r4926 load_dump = subprocess.Popen(
project: added all source files and assets
r1 ['svnadmin', 'load', repo_path],
python3: remove usage of subprocess32
r4926 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
project: added all source files and assets
r1 out, err = load_dump.communicate(dump)
if load_dump.returncode != 0:
log.error("Output of load_dump command: %s", out)
log.error("Error output of load_dump command: %s", err)
raise Exception(
'Failed to load dump "%s" into repository at path "%s".'
% (dump_name, repo_path))
class AssertResponse(object):
"""
Utility that helps to assert things about a given HTML response.
"""
def __init__(self, response):
self.response = response
test: moved lxml to local imports since this only exists during tests.
r1239 def get_imports(self):
return fromstring, tostring, CSSSelector
project: added all source files and assets
r1 def one_element_exists(self, css_selector):
self.get_element(css_selector)
def no_element_exists(self, css_selector):
assert not self._get_elements(css_selector)
def element_equals_to(self, css_selector, expected_content):
element = self.get_element(css_selector)
element_text = self._element_to_string(element)
tests: multiple tests cases fixes for python3
r4994
project: added all source files and assets
r1 assert expected_content in element_text
def element_contains(self, css_selector, expected_content):
element = self.get_element(css_selector)
assert expected_content in element.text_content()
Martin Bornhold
tests: Add a method to AssertResponse to check value of elements.
r1046 def element_value_contains(self, css_selector, expected_content):
element = self.get_element(css_selector)
assert expected_content in element.value
project: added all source files and assets
r1 def contains_one_link(self, link_text, href):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
project: added all source files and assets
r1 doc = fromstring(self.response.body)
sel = CSSSelector('a[href]')
elements = [
e for e in sel(doc) if e.text_content().strip() == link_text]
assert len(elements) == 1, "Did not find link or found multiple links"
self._ensure_url_equal(elements[0].attrib.get('href'), href)
def contains_one_anchor(self, anchor_id):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
project: added all source files and assets
r1 doc = fromstring(self.response.body)
sel = CSSSelector('#' + anchor_id)
elements = sel(doc)
tests: improved test utils....
r1442 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
project: added all source files and assets
r1
def _ensure_url_equal(self, found, expected):
assert _Url(found) == _Url(expected)
def get_element(self, css_selector):
elements = self._get_elements(css_selector)
tests: improved test utils....
r1442 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
project: added all source files and assets
r1 return elements[0]
file-browser: refactor how we load metadata for file trees....
r423 def get_elements(self, css_selector):
return self._get_elements(css_selector)
project: added all source files and assets
r1 def _get_elements(self, css_selector):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
project: added all source files and assets
r1 doc = fromstring(self.response.body)
sel = CSSSelector(css_selector)
elements = sel(doc)
return elements
def _element_to_string(self, element):
test: moved lxml to local imports since this only exists during tests.
r1239 fromstring, tostring, CSSSelector = self.get_imports()
tests: multiple tests cases fixes for python3
r4994 return tostring(element, encoding='unicode')
project: added all source files and assets
r1
class _Url(object):
"""
A url object that can be compared with other url orbjects
without regard to the vagaries of encoding, escaping, and ordering
of parameters in query strings.
Inspired by
http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
"""
def __init__(self, url):
python3: fixed urlparse import
r4919 parts = urllib.parse.urlparse(url)
_query = frozenset(urllib.parse.parse_qsl(parts.query))
project: added all source files and assets
r1 _path = unquote_plus(parts.path)
parts = parts._replace(query=_query, path=_path)
self.parts = parts
def __eq__(self, other):
return self.parts == other.parts
def __hash__(self):
return hash(self.parts)
def run_test_concurrently(times, raise_catched_exc=True):
"""
Add this decorator to small pieces of code that you want to test
concurrently
ex:
@test_concurrently(25)
def my_test_function():
...
"""
def test_concurrently_decorator(test_func):
def wrapper(*args, **kwargs):
exceptions = []
def call_test_func():
try:
test_func(*args, **kwargs)
test: moved lxml to local imports since this only exists during tests.
r1239 except Exception as e:
project: added all source files and assets
r1 exceptions.append(e)
if raise_catched_exc:
raise
threads = []
for i in range(times):
threads.append(threading.Thread(target=call_test_func))
for t in threads:
t.start()
for t in threads:
t.join()
if exceptions:
raise Exception(
'test_concurrently intercepted %s exceptions: %s' % (
len(exceptions), exceptions))
return wrapper
return test_concurrently_decorator
def wait_for_url(url, timeout=10):
"""
Wait until URL becomes reachable.
It polls the URL until the timeout is reached or it became reachable.
If will call to `py.test.fail` in case the URL is not reachable.
"""
timeout = time.time() + timeout
last = 0
wait = 0.1
test: moved lxml to local imports since this only exists during tests.
r1239 while timeout > last:
project: added all source files and assets
r1 last = time.time()
tests: allow logging exceptions on url reach check
r5023 if is_url_reachable(url, log_exc=False):
project: added all source files and assets
r1 break
test: moved lxml to local imports since this only exists during tests.
r1239 elif (last + wait) > time.time():
project: added all source files and assets
r1 # Go to sleep because not enough time has passed since last check.
time.sleep(wait)
else:
tests: allow logging exceptions on url reach check
r5023 pytest.fail(f"Timeout while waiting for URL {url}")
project: added all source files and assets
r1
tests: fixed all tests for python3 BIG changes
r5087 def is_url_reachable(url: str, log_exc: bool = False) -> bool:
project: added all source files and assets
r1 try:
python3: fix urllib usage
r4914 urllib.request.urlopen(url)
except urllib.error.URLError:
tests: allow logging exceptions on url reach check
r5023 if log_exc:
tests: fixed all tests for python3 BIG changes
r5087 log.exception(f'URL `{url}` reach error')
project: added all source files and assets
r1 return False
return True
pytest: Fix the user login function....
r41
Martin Bornhold
tests: Fix a helper method that checks if a repository is available on filesystem....
r486 def repo_on_filesystem(repo_name):
from rhodecode.lib import vcs
from rhodecode.tests import TESTS_TMP_PATH
repo = vcs.get_vcs_instance(
os.path.join(TESTS_TMP_PATH, repo_name), create=False)
return repo is not None
diffs: compare overhaul....
r1259
def commit_change(
fix(tests): fixed the creation of non-linear commits creation in tests...
r5198 repo, filename: bytes, content: bytes, message, vcs_type, parent=None, branch=None, newfile=False):
diffs: compare overhaul....
r1259 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
repo = Repository.get_by_repo_name(repo)
_commit = parent
if not parent:
_commit = EmptyCommit(alias=vcs_type)
if newfile:
nodes = {
filename: {
'content': content
}
}
commit = ScmModel().create_nodes(
user=TEST_USER_ADMIN_LOGIN, repo=repo,
message=message,
nodes=nodes,
parent_commit=_commit,
pytest: restructure fixtures/plugins to fix problems with pkg_resources and Can't perform this operation for unregistered loader type errors....
r4986 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
diffs: compare overhaul....
r1259 )
else:
commit = ScmModel().commit_change(
repo=repo.scm_instance(), repo_name=repo.repo_name,
commit=parent, user=TEST_USER_ADMIN_LOGIN,
pytest: restructure fixtures/plugins to fix problems with pkg_resources and Can't perform this operation for unregistered loader type errors....
r4986 author=f'{TEST_USER_ADMIN_LOGIN} <admin@rhodecode.com>',
diffs: compare overhaul....
r1259 message=message,
content=content,
fix(tests): fixed the creation of non-linear commits creation in tests...
r5198 f_path=filename,
branch=branch
diffs: compare overhaul....
r1259 )
return commit
tests: added tests for permission update views to catch obvious form errors.
r2827
def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
if not default:
raise ValueError('Permission for default user must be given')
form_data = [(
'csrf_token', csrf_token
)]
# add default
form_data.extend([
('u_perm_1', default)
])
if grant:
for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
form_data.extend([
('perm_new_member_perm_new{}'.format(cnt), perm),
('perm_new_member_id_new{}'.format(cnt), obj_id),
('perm_new_member_name_new{}'.format(cnt), obj_name),
('perm_new_member_type_new{}'.format(cnt), obj_type),
])
if revoke:
for obj_id, obj_type in revoke:
form_data.extend([
('perm_del_member_id_{}'.format(obj_id), obj_id),
('perm_del_member_type_{}'.format(obj_id), obj_type),
])
return form_data