__init__.py
224 lines
| 6.9 KiB
| text/x-python
|
PythonLexer
r5607 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import time | ||||
import logging | ||||
import datetime | ||||
import tempfile | ||||
from os.path import join as jn | ||||
r4994 | import urllib.parse | |||
r1 | ||||
import pytest | ||||
r5356 | import rhodecode | |||
r1 | from rhodecode.model.db import User | |||
from rhodecode.lib import auth | ||||
r1941 | from rhodecode.lib import helpers as h | |||
r4614 | from rhodecode.lib.helpers import flash | |||
r4994 | from rhodecode.lib.str_utils import safe_str | |||
from rhodecode.lib.hash_utils import sha1_safe | ||||
r5618 | from rhodecode.bootstrap import \ | |||
TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_ADMIN_EMAIL, \ | ||||
TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, TEST_USER_REGULAR_EMAIL, \ | ||||
TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL,\ | ||||
HG_REPO, GIT_REPO, SVN_REPO,\ | ||||
NEW_HG_REPO, NEW_GIT_REPO,\ | ||||
HG_FORK, GIT_FORK | ||||
r1 | ||||
log = logging.getLogger(__name__) | ||||
# SOME GLOBALS FOR TESTS | ||||
TEST_DIR = tempfile.gettempdir() | ||||
## VCS | ||||
SCM_TESTS = ['hg', 'git'] | ||||
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple()))) | ||||
r5357 | TESTS_TMP_PATH = tempfile.mkdtemp(prefix='rc_test_', dir=TEST_DIR) | |||
r1 | TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO) | |||
r4994 | TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcsgitclone{uniq_suffix}') | |||
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, f'vcsgitpull{uniq_suffix}') | ||||
r1 | ||||
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO) | ||||
r4994 | TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcshgclone{uniq_suffix}') | |||
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, f'vcshgpull{uniq_suffix}') | ||||
r1 | ||||
TEST_REPO_PREFIX = 'vcs-test' | ||||
r2883 | def clear_cache_regions(regions=None): | |||
# dogpile | ||||
from rhodecode.lib.rc_cache import region_meta | ||||
for region_name, region in region_meta.dogpile_cache_regions.items(): | ||||
if not regions or region_name in regions: | ||||
region.invalidate() | ||||
r1 | ||||
def get_new_dir(title): | ||||
""" | ||||
Returns always new directory path. | ||||
""" | ||||
from rhodecode.tests.vcs.utils import get_normalized_path | ||||
name_parts = [TEST_REPO_PREFIX] | ||||
if title: | ||||
name_parts.append(title) | ||||
r4994 | hex_str = sha1_safe(f'{os.getpid()} {time.time()}') | |||
r1 | name_parts.append(hex_str) | |||
name = '-'.join(name_parts) | ||||
r5356 | path = jn(TEST_DIR, name) | |||
r1 | return get_normalized_path(path) | |||
r2810 | def repo_id_generator(name): | |||
numeric_hash = 0 | ||||
for char in name: | ||||
numeric_hash += (ord(char)) | ||||
return numeric_hash | ||||
r1 | @pytest.mark.usefixtures('app', 'index_location') | |||
class TestController(object): | ||||
maxDiff = None | ||||
def log_user(self, username=TEST_USER_ADMIN_LOGIN, | ||||
password=TEST_USER_ADMIN_PASS): | ||||
self._logged_username = username | ||||
self._session = login_user_session(self.app, username, password) | ||||
self.csrf_token = auth.get_csrf_token(self._session) | ||||
return self._session['rhodecode_user'] | ||||
def logout_user(self): | ||||
logout_user_session(self.app, auth.get_csrf_token(self._session)) | ||||
self.csrf_token = None | ||||
self._logged_username = None | ||||
self._session = None | ||||
def _get_logged_user(self): | ||||
return User.get_by_username(self._logged_username) | ||||
def login_user_session( | ||||
app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS): | ||||
r1941 | ||||
r40 | response = app.post( | |||
r1941 | h.route_path('login'), | |||
r40 | {'username': username, 'password': password}) | |||
r4994 | if 'invalid user name' in response.text: | |||
pytest.fail(f'could not login using {username} {password}') | ||||
r1 | ||||
assert response.status == '302 Found' | ||||
response = response.follow() | ||||
r41 | assert response.status == '200 OK' | |||
r1 | ||||
r1774 | session = response.get_session_from_response() | |||
r41 | assert 'rhodecode_user' in session | |||
rc_user = session['rhodecode_user'] | ||||
assert rc_user.get('username') == username | ||||
assert rc_user.get('is_authenticated') | ||||
return session | ||||
r1 | ||||
def logout_user_session(app, csrf_token): | ||||
r1941 | app.post(h.route_path('logout'), {'csrf_token': csrf_token}, status=302) | |||
r1 | ||||
def login_user(app, username=TEST_USER_ADMIN_LOGIN, | ||||
password=TEST_USER_ADMIN_PASS): | ||||
return login_user_session(app, username, password)['rhodecode_user'] | ||||
r2095 | def assert_session_flash(response, msg=None, category=None, no_=None): | |||
r1 | """ | |||
Assert on a flash message in the current session. | ||||
r2095 | :param response: Response from give calll, it will contain flash | |||
messages or bound session with them. | ||||
:param msg: The expected message. Will be evaluated if a | ||||
r1 | :class:`LazyString` is passed in. | |||
:param category: Optional. If passed, the message category will be | ||||
checked as well. | ||||
r2095 | :param no_: Optional. If passed, the message will be checked to NOT | |||
be in the flash session | ||||
r1 | """ | |||
r1925 | if msg is None and no_ is None: | |||
raise ValueError("Parameter msg or no_ is required.") | ||||
if msg and no_: | ||||
raise ValueError("Please specify either msg or no_, but not both") | ||||
r1 | ||||
r2095 | session = response.get_session_from_response() | |||
messages = flash.pop_messages(session=session) | ||||
r1097 | msg = _eval_if_lazy(msg) | |||
r2349 | if no_: | |||
r4994 | error_msg = f'unable to detect no_ message `{no_}` in empty flash list' | |||
r2349 | else: | |||
r4994 | error_msg = f'unable to find message `{msg}` in empty flash list' | |||
r2349 | assert messages, error_msg | |||
r1 | message = messages[0] | |||
r1925 | message_text = _eval_if_lazy(message.message) or '' | |||
r1 | ||||
r1925 | if no_: | |||
if no_ in message_text: | ||||
r4994 | msg = f'msg `{no_}` found in session flash.' | |||
r1925 | pytest.fail(safe_str(msg)) | |||
else: | ||||
r5087 | ||||
r1925 | if msg not in message_text: | |||
r4994 | fail_msg = f'msg `{msg}` not found in ' \ | |||
f'session flash: got `{message_text}` (type:{type(message_text)}) instead' | ||||
r1312 | ||||
r1925 | pytest.fail(safe_str(fail_msg)) | |||
r5087 | ||||
r1925 | if category: | |||
assert category == message.category | ||||
r1 | ||||
def _eval_if_lazy(value): | ||||
return value.eval() if hasattr(value, 'eval') else value | ||||
r1720 | def no_newline_id_generator(test_name): | |||
""" | ||||
Generates a test name without spaces or newlines characters. Used for | ||||
nicer output of progress of test | ||||
""" | ||||
r4994 | ||||
r3098 | test_name = safe_str(test_name)\ | |||
r1720 | .replace('\n', '_N') \ | |||
r1930 | .replace('\r', '_N') \ | |||
r1720 | .replace('\t', '_T') \ | |||
.replace(' ', '_S') | ||||
return test_name or 'test-with-empty-name' | ||||
r4614 | ||||
r5607 | def console_printer(*msg): | |||
print_func = print | ||||
try: | ||||
from rich import print as print_func | ||||
except ImportError: | ||||
pass | ||||
print_func(*msg) | ||||