diff --git a/rhodecode/apps/my_account/tests/test_my_account_simple_views.py b/rhodecode/apps/my_account/tests/test_my_account_simple_views.py --- a/rhodecode/apps/my_account/tests/test_my_account_simple_views.py +++ b/rhodecode/apps/my_account/tests/test_my_account_simple_views.py @@ -51,7 +51,7 @@ class TestMyAccountSimpleViews(TestContr Repository.user == User.get_by_username( TEST_USER_ADMIN_LOGIN)).all() for repo in repos: - response.mustcontain('"name_raw": "%s"' % repo.repo_name) + response.mustcontain(f'"name_raw":"{repo.repo_name}"') def test_my_account_my_watched(self, autologin_user): response = self.app.get(route_path('my_account_watched')) @@ -60,8 +60,7 @@ class TestMyAccountSimpleViews(TestContr UserFollowing.user == User.get_by_username( TEST_USER_ADMIN_LOGIN)).all() for repo in repos: - response.mustcontain( - '"name_raw": "%s"' % repo.follows_repository.repo_name) + response.mustcontain(f'"name_raw":"{repo.follows_repository.repo_name}"') def test_my_account_perms(self, autologin_user): response = self.app.get(route_path('my_account_perms')) diff --git a/rhodecode/apps/repository/tests/test_repo_changelog.py b/rhodecode/apps/repository/tests/test_repo_changelog.py --- a/rhodecode/apps/repository/tests/test_repo_changelog.py +++ b/rhodecode/apps/repository/tests/test_repo_changelog.py @@ -107,7 +107,7 @@ class TestChangelogController(TestContro assert expected_url in response.location response = response.follow() expected_warning = 'Branch {} is not found.'.format(branch) - assert expected_warning in response.body + assert expected_warning in response.text @pytest.mark.xfail_backends("svn", reason="Depends on branch support") def test_changelog_filtered_by_branch_with_merges( diff --git a/rhodecode/apps/repository/tests/test_repo_commits.py b/rhodecode/apps/repository/tests/test_repo_commits.py --- a/rhodecode/apps/repository/tests/test_repo_commits.py +++ b/rhodecode/apps/repository/tests/test_repo_commits.py @@ -59,20 +59,20 @@ class TestRepoCommitView(object): response = self.app.get(route_path( 'repo_commit_raw', repo_name=backend.repo_name, commit_id=commit_id)) - assert response.body == self.diffs[backend.alias] + assert response.text == self.diffs[backend.alias] def test_show_raw_patch(self, backend): response = self.app.get(route_path( 'repo_commit_patch', repo_name=backend.repo_name, commit_id=self.commit_id[backend.alias])) - assert response.body == self.patches[backend.alias] + assert response.text == self.patches[backend.alias] def test_commit_download(self, backend): response = self.app.get(route_path( 'repo_commit_download', repo_name=backend.repo_name, commit_id=self.commit_id[backend.alias])) - assert response.body == self.diffs[backend.alias] + assert response.text == self.diffs[backend.alias] def test_single_commit_page_different_ops(self, backend): commit_id = { diff --git a/rhodecode/apps/repository/tests/test_repo_compare.py b/rhodecode/apps/repository/tests/test_repo_compare.py --- a/rhodecode/apps/repository/tests/test_repo_compare.py +++ b/rhodecode/apps/repository/tests/test_repo_compare.py @@ -600,7 +600,7 @@ class TestCompareControllerSvn(object): status=200) # It should show commits - assert 'No commits in this compare' not in response.body + assert 'No commits in this compare' not in response.text # Should find only one file changed when comparing those two tags response.mustcontain('example.py') diff --git a/rhodecode/apps/repository/tests/test_repo_files.py b/rhodecode/apps/repository/tests/test_repo_files.py --- a/rhodecode/apps/repository/tests/test_repo_files.py +++ b/rhodecode/apps/repository/tests/test_repo_files.py @@ -398,7 +398,7 @@ class TestFilesViews(object): repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='README.rst'), extra_environ=xhr_header) - assert response.body == '' + assert response.text == '' def test_nodetree_wrong_path(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) @@ -410,7 +410,7 @@ class TestFilesViews(object): err = 'error: There is no file nor ' \ 'directory at the given path' - assert err in response.body + assert err in response.text def test_nodetree_missing_xhr(self, backend): self.app.get( diff --git a/rhodecode/apps/repository/tests/test_repo_issue_tracker.py b/rhodecode/apps/repository/tests/test_repo_issue_tracker.py --- a/rhodecode/apps/repository/tests/test_repo_issue_tracker.py +++ b/rhodecode/apps/repository/tests/test_repo_issue_tracker.py @@ -83,7 +83,7 @@ class TestRepoIssueTracker(object): repo_name=backend.repo.repo_name), extra_environ=xhr_header, params=data) - assert response.body == \ + assert response.text == \ 'example of prefix replacement' @request.addfinalizer diff --git a/rhodecode/apps/repository/tests/test_repo_pullrequests.py b/rhodecode/apps/repository/tests/test_repo_pullrequests.py --- a/rhodecode/apps/repository/tests/test_repo_pullrequests.py +++ b/rhodecode/apps/repository/tests/test_repo_pullrequests.py @@ -1644,7 +1644,7 @@ class TestPullrequestsControllerDelete(o params={'csrf_token': csrf_token}, status=200 ) - assert response.body == 'true' + assert response.text == 'true' @pytest.mark.parametrize('url_type', [ 'pullrequest_new', diff --git a/rhodecode/tests/__init__.py b/rhodecode/tests/__init__.py --- a/rhodecode/tests/__init__.py +++ b/rhodecode/tests/__init__.py @@ -22,11 +22,9 @@ import os import time import logging import datetime -import hashlib import tempfile from os.path import join as jn - -from tempfile import _RandomNameSequence +import urllib.parse import pytest @@ -34,8 +32,8 @@ from rhodecode.model.db import User from rhodecode.lib import auth from rhodecode.lib import helpers as h from rhodecode.lib.helpers import flash -from rhodecode.lib.utils2 import safe_str - +from rhodecode.lib.str_utils import safe_str +from rhodecode.lib.hash_utils import sha1_safe log = logging.getLogger(__name__) @@ -57,7 +55,7 @@ log = logging.getLogger(__name__) # SOME GLOBALS FOR TESTS TEST_DIR = tempfile.gettempdir() -TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(_RandomNameSequence()))) +TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(tempfile._RandomNameSequence()))) TEST_USER_ADMIN_LOGIN = 'test_admin' TEST_USER_ADMIN_PASS = 'test12' TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com' @@ -85,12 +83,12 @@ SCM_TESTS = ['hg', 'git'] uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple()))) TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO) -TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix) -TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix) +TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcsgitclone{uniq_suffix}') +TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, f'vcsgitpull{uniq_suffix}') TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO) -TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix) -TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix) +TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcshgclone{uniq_suffix}') +TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, f'vcshgpull{uniq_suffix}') TEST_REPO_PREFIX = 'vcs-test' @@ -111,7 +109,7 @@ def get_new_dir(title): name_parts = [TEST_REPO_PREFIX] if title: name_parts.append(title) - hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest() + hex_str = sha1_safe(f'{os.getpid()} {time.time()}') name_parts.append(hex_str) name = '-'.join(name_parts) path = os.path.join(TEST_DIR, name) @@ -154,8 +152,8 @@ def login_user_session( response = app.post( h.route_path('login'), {'username': username, 'password': password}) - if 'invalid user name' in response.body: - pytest.fail('could not login using %s %s' % (username, password)) + if 'invalid user name' in response.text: + pytest.fail(f'could not login using {username} {password}') assert response.status == '302 Found' response = response.follow() @@ -203,9 +201,9 @@ def assert_session_flash(response, msg=N msg = _eval_if_lazy(msg) if no_: - error_msg = 'unable to detect no_ message `%s` in empty flash list' % no_ + error_msg = f'unable to detect no_ message `{no_}` in empty flash list' else: - error_msg = 'unable to find message `%s` in empty flash list' % msg + error_msg = f'unable to find message `{msg}` in empty flash list' assert messages, error_msg message = messages[0] @@ -213,13 +211,12 @@ def assert_session_flash(response, msg=N if no_: if no_ in message_text: - msg = u'msg `%s` found in session flash.' % (no_,) + msg = f'msg `{no_}` found in session flash.' pytest.fail(safe_str(msg)) else: if msg not in message_text: - fail_msg = u'msg `%s` not found in session ' \ - u'flash: got `%s` (type:%s) instead' % ( - msg, message_text, type(message_text)) + fail_msg = f'msg `{msg}` not found in ' \ + f'session flash: got `{message_text}` (type:{type(message_text)}) instead' pytest.fail(safe_str(fail_msg)) if category: @@ -235,7 +232,7 @@ def no_newline_id_generator(test_name): Generates a test name without spaces or newlines characters. Used for nicer output of progress of test """ - org_name = test_name + test_name = safe_str(test_name)\ .replace('\n', '_N') \ .replace('\r', '_N') \ @@ -246,7 +243,6 @@ def no_newline_id_generator(test_name): def route_path_generator(url_defs, name, params=None, **kwargs): - import urllib.request, urllib.parse, urllib.error base_url = url_defs[name].format(**kwargs) diff --git a/rhodecode/tests/fixture_mods/fixture_pyramid.py b/rhodecode/tests/fixture_mods/fixture_pyramid.py --- a/rhodecode/tests/fixture_mods/fixture_pyramid.py +++ b/rhodecode/tests/fixture_mods/fixture_pyramid.py @@ -143,7 +143,7 @@ def get_available_port(min_port=40000, m @pytest.fixture(scope='session') def rcserver_port(request): port = get_available_port() - print('Using rhodecode port {}'.format(port)) + print(f'Using rhodecode port {port}') return port @@ -152,7 +152,7 @@ def vcsserver_port(request): port = request.config.getoption('--vcsserver-port') if port is None: port = get_available_port() - print('Using vcsserver port {}'.format(port)) + print(f'Using vcsserver port {port}') return port diff --git a/rhodecode/tests/fixture_mods/fixture_utils.py b/rhodecode/tests/fixture_mods/fixture_utils.py --- a/rhodecode/tests/fixture_mods/fixture_utils.py +++ b/rhodecode/tests/fixture_mods/fixture_utils.py @@ -20,7 +20,6 @@ import collections import datetime -import hashlib import os import re import pprint @@ -31,6 +30,7 @@ import time import uuid import dateutil.tz import logging +import functools import mock import pyramid.testing @@ -40,7 +40,7 @@ import requests import pyramid.paster import rhodecode -from rhodecode.lib.utils2 import AttributeDict +import rhodecode.lib from rhodecode.model.changeset_status import ChangesetStatusModel from rhodecode.model.comment import CommentsModel from rhodecode.model.db import ( @@ -57,6 +57,7 @@ from rhodecode.model.integration import from rhodecode.integrations import integration_type_registry from rhodecode.integrations.types.base import IntegrationTypeBase from rhodecode.lib.utils import repo2db_mapper +from rhodecode.lib.hash_utils import sha1_safe from rhodecode.lib.vcs.backends import get_backend from rhodecode.lib.vcs.nodes import FileNode from rhodecode.tests import ( @@ -74,6 +75,7 @@ def cmp(a, b): # backport cmp from python2 so we can still use it in the custom code in this module return (a > b) - (a < b) + @pytest.fixture(scope='session', autouse=True) def activate_example_rcextensions(request): """ @@ -1265,7 +1267,7 @@ class UserUtility(object): return cmp(second_group_parts, first_group_parts) sorted_repo_group_ids = sorted( - self.repo_group_ids, cmp=_repo_group_compare) + self.repo_group_ids, key=functools.cmp_to_key(_repo_group_compare)) for repo_group_id in sorted_repo_group_ids: self.fixture.destroy_repo_group(repo_group_id) @@ -1290,7 +1292,7 @@ class UserUtility(object): return cmp(second_group_parts, first_group_parts) sorted_user_group_ids = sorted( - self.user_group_ids, cmp=_user_group_compare) + self.user_group_ids, key=functools.cmp_to_key(_user_group_compare)) for user_group_id in sorted_user_group_ids: self.fixture.destroy_user_group(user_group_id) @@ -1429,8 +1431,7 @@ class SettingsUtility(object): def create_repo_rhodecode_ui( self, repo, section, value, key=None, active=True, cleanup=True): - key = key or hashlib.sha1( - '{}{}{}'.format(section, value, repo.repo_id)).hexdigest() + key = key or sha1_safe(f'{section}{value}{repo.repo_id}') setting = RepoRhodeCodeUi() setting.repository_id = repo.repo_id @@ -1447,7 +1448,7 @@ class SettingsUtility(object): def create_rhodecode_ui( self, section, value, key=None, active=True, cleanup=True): - key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest() + key = key or sha1_safe(f'{section}{value}') setting = RhodeCodeUi() setting.ui_section = section diff --git a/rhodecode/tests/lib/middleware/test_vcs_unavailable.py b/rhodecode/tests/lib/middleware/test_vcs_unavailable.py --- a/rhodecode/tests/lib/middleware/test_vcs_unavailable.py +++ b/rhodecode/tests/lib/middleware/test_vcs_unavailable.py @@ -27,7 +27,7 @@ def test_vcs_available_returns_summary_p url = '/{repo_name}'.format(repo_name=backend.repo.repo_name) response = app.get(url) assert response.status_code == 200 - assert 'Summary' in response.body + assert 'Summary' in response.text @pytest.mark.usefixtures('autologin_user', 'app') @@ -48,4 +48,4 @@ def test_vcs_unavailable_returns_vcs_err response = app.get(url, expect_errors=True) assert response.status_code == 502 - assert 'Could not connect to VCS Server' in response.body + assert 'Could not connect to VCS Server' in response.text diff --git a/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py b/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py --- a/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py +++ b/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py @@ -96,4 +96,4 @@ def test_remote_app_caller(): ('Content-Type', 'text/plain'), ('Content-Length', '7'), ]) - assert response.body == 'content' + assert response.text == 'content' diff --git a/rhodecode/tests/lib/test_encrypt.py b/rhodecode/tests/lib/test_encrypt.py --- a/rhodecode/tests/lib/test_encrypt.py +++ b/rhodecode/tests/lib/test_encrypt.py @@ -21,113 +21,116 @@ import pytest from rhodecode.lib.encrypt import ( - AESCipher, SignatureVerificationError, InvalidDecryptedValue) -from rhodecode.lib.encrypt2 import (Encryptor, InvalidToken) + AESCipher, InvalidDecryptedValue) +from rhodecode.lib import enc_utils +from rhodecode.lib.str_utils import safe_str +from rhodecode.lib.exceptions import SignatureVerificationError + + +@pytest.mark.parametrize( + "algo", ['fernet', 'aes'], +) +@pytest.mark.parametrize( + "key, text", + [ + (b'a', 'short'), + (b'a' * 64, 'too long(trimmed to 32)'), + (b'a' * 32, 'just enough'), + ('ąćęćę', 'non asci'), + ('$asa$asa', 'special $ used'), + ] +) +@pytest.mark.parametrize( + "strict_mode", [True, False], +) +def test_common_encryption_module(algo, key, text, strict_mode): + encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) + decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=strict_mode) + assert text == safe_str(decrypted) + + +@pytest.mark.parametrize( + "algo", ['fernet', 'aes'], +) +def test_encryption_with_bad_key(algo): + key = b'secretstring' + text = b'ihatemysql' + + encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) + decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=b'different-key', strict_mode=False) + + assert decrypted[:22] == '