diff --git a/rhodecode/apps/my_account/tests/test_my_account_simple_views.py b/rhodecode/apps/my_account/tests/test_my_account_simple_views.py
--- a/rhodecode/apps/my_account/tests/test_my_account_simple_views.py
+++ b/rhodecode/apps/my_account/tests/test_my_account_simple_views.py
@@ -51,7 +51,7 @@ class TestMyAccountSimpleViews(TestContr
Repository.user == User.get_by_username(
TEST_USER_ADMIN_LOGIN)).all()
for repo in repos:
- response.mustcontain('"name_raw": "%s"' % repo.repo_name)
+ response.mustcontain(f'"name_raw":"{repo.repo_name}"')
def test_my_account_my_watched(self, autologin_user):
response = self.app.get(route_path('my_account_watched'))
@@ -60,8 +60,7 @@ class TestMyAccountSimpleViews(TestContr
UserFollowing.user == User.get_by_username(
TEST_USER_ADMIN_LOGIN)).all()
for repo in repos:
- response.mustcontain(
- '"name_raw": "%s"' % repo.follows_repository.repo_name)
+ response.mustcontain(f'"name_raw":"{repo.follows_repository.repo_name}"')
def test_my_account_perms(self, autologin_user):
response = self.app.get(route_path('my_account_perms'))
diff --git a/rhodecode/apps/repository/tests/test_repo_changelog.py b/rhodecode/apps/repository/tests/test_repo_changelog.py
--- a/rhodecode/apps/repository/tests/test_repo_changelog.py
+++ b/rhodecode/apps/repository/tests/test_repo_changelog.py
@@ -107,7 +107,7 @@ class TestChangelogController(TestContro
assert expected_url in response.location
response = response.follow()
expected_warning = 'Branch {} is not found.'.format(branch)
- assert expected_warning in response.body
+ assert expected_warning in response.text
@pytest.mark.xfail_backends("svn", reason="Depends on branch support")
def test_changelog_filtered_by_branch_with_merges(
diff --git a/rhodecode/apps/repository/tests/test_repo_commits.py b/rhodecode/apps/repository/tests/test_repo_commits.py
--- a/rhodecode/apps/repository/tests/test_repo_commits.py
+++ b/rhodecode/apps/repository/tests/test_repo_commits.py
@@ -59,20 +59,20 @@ class TestRepoCommitView(object):
response = self.app.get(route_path(
'repo_commit_raw',
repo_name=backend.repo_name, commit_id=commit_id))
- assert response.body == self.diffs[backend.alias]
+ assert response.text == self.diffs[backend.alias]
def test_show_raw_patch(self, backend):
response = self.app.get(route_path(
'repo_commit_patch', repo_name=backend.repo_name,
commit_id=self.commit_id[backend.alias]))
- assert response.body == self.patches[backend.alias]
+ assert response.text == self.patches[backend.alias]
def test_commit_download(self, backend):
response = self.app.get(route_path(
'repo_commit_download',
repo_name=backend.repo_name,
commit_id=self.commit_id[backend.alias]))
- assert response.body == self.diffs[backend.alias]
+ assert response.text == self.diffs[backend.alias]
def test_single_commit_page_different_ops(self, backend):
commit_id = {
diff --git a/rhodecode/apps/repository/tests/test_repo_compare.py b/rhodecode/apps/repository/tests/test_repo_compare.py
--- a/rhodecode/apps/repository/tests/test_repo_compare.py
+++ b/rhodecode/apps/repository/tests/test_repo_compare.py
@@ -600,7 +600,7 @@ class TestCompareControllerSvn(object):
status=200)
# It should show commits
- assert 'No commits in this compare' not in response.body
+ assert 'No commits in this compare' not in response.text
# Should find only one file changed when comparing those two tags
response.mustcontain('example.py')
diff --git a/rhodecode/apps/repository/tests/test_repo_files.py b/rhodecode/apps/repository/tests/test_repo_files.py
--- a/rhodecode/apps/repository/tests/test_repo_files.py
+++ b/rhodecode/apps/repository/tests/test_repo_files.py
@@ -398,7 +398,7 @@ class TestFilesViews(object):
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='README.rst'),
extra_environ=xhr_header)
- assert response.body == ''
+ assert response.text == ''
def test_nodetree_wrong_path(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
@@ -410,7 +410,7 @@ class TestFilesViews(object):
err = 'error: There is no file nor ' \
'directory at the given path'
- assert err in response.body
+ assert err in response.text
def test_nodetree_missing_xhr(self, backend):
self.app.get(
diff --git a/rhodecode/apps/repository/tests/test_repo_issue_tracker.py b/rhodecode/apps/repository/tests/test_repo_issue_tracker.py
--- a/rhodecode/apps/repository/tests/test_repo_issue_tracker.py
+++ b/rhodecode/apps/repository/tests/test_repo_issue_tracker.py
@@ -83,7 +83,7 @@ class TestRepoIssueTracker(object):
repo_name=backend.repo.repo_name),
extra_environ=xhr_header, params=data)
- assert response.body == \
+ assert response.text == \
'example of prefix replacement'
@request.addfinalizer
diff --git a/rhodecode/apps/repository/tests/test_repo_pullrequests.py b/rhodecode/apps/repository/tests/test_repo_pullrequests.py
--- a/rhodecode/apps/repository/tests/test_repo_pullrequests.py
+++ b/rhodecode/apps/repository/tests/test_repo_pullrequests.py
@@ -1644,7 +1644,7 @@ class TestPullrequestsControllerDelete(o
params={'csrf_token': csrf_token},
status=200
)
- assert response.body == 'true'
+ assert response.text == 'true'
@pytest.mark.parametrize('url_type', [
'pullrequest_new',
diff --git a/rhodecode/tests/__init__.py b/rhodecode/tests/__init__.py
--- a/rhodecode/tests/__init__.py
+++ b/rhodecode/tests/__init__.py
@@ -22,11 +22,9 @@ import os
import time
import logging
import datetime
-import hashlib
import tempfile
from os.path import join as jn
-
-from tempfile import _RandomNameSequence
+import urllib.parse
import pytest
@@ -34,8 +32,8 @@ from rhodecode.model.db import User
from rhodecode.lib import auth
from rhodecode.lib import helpers as h
from rhodecode.lib.helpers import flash
-from rhodecode.lib.utils2 import safe_str
-
+from rhodecode.lib.str_utils import safe_str
+from rhodecode.lib.hash_utils import sha1_safe
log = logging.getLogger(__name__)
@@ -57,7 +55,7 @@ log = logging.getLogger(__name__)
# SOME GLOBALS FOR TESTS
TEST_DIR = tempfile.gettempdir()
-TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(_RandomNameSequence())))
+TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(tempfile._RandomNameSequence())))
TEST_USER_ADMIN_LOGIN = 'test_admin'
TEST_USER_ADMIN_PASS = 'test12'
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
@@ -85,12 +83,12 @@ SCM_TESTS = ['hg', 'git']
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
-TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
-TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
+TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcsgitclone{uniq_suffix}')
+TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, f'vcsgitpull{uniq_suffix}')
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
-TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
-TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
+TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcshgclone{uniq_suffix}')
+TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, f'vcshgpull{uniq_suffix}')
TEST_REPO_PREFIX = 'vcs-test'
@@ -111,7 +109,7 @@ def get_new_dir(title):
name_parts = [TEST_REPO_PREFIX]
if title:
name_parts.append(title)
- hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
+ hex_str = sha1_safe(f'{os.getpid()} {time.time()}')
name_parts.append(hex_str)
name = '-'.join(name_parts)
path = os.path.join(TEST_DIR, name)
@@ -154,8 +152,8 @@ def login_user_session(
response = app.post(
h.route_path('login'),
{'username': username, 'password': password})
- if 'invalid user name' in response.body:
- pytest.fail('could not login using %s %s' % (username, password))
+ if 'invalid user name' in response.text:
+ pytest.fail(f'could not login using {username} {password}')
assert response.status == '302 Found'
response = response.follow()
@@ -203,9 +201,9 @@ def assert_session_flash(response, msg=N
msg = _eval_if_lazy(msg)
if no_:
- error_msg = 'unable to detect no_ message `%s` in empty flash list' % no_
+ error_msg = f'unable to detect no_ message `{no_}` in empty flash list'
else:
- error_msg = 'unable to find message `%s` in empty flash list' % msg
+ error_msg = f'unable to find message `{msg}` in empty flash list'
assert messages, error_msg
message = messages[0]
@@ -213,13 +211,12 @@ def assert_session_flash(response, msg=N
if no_:
if no_ in message_text:
- msg = u'msg `%s` found in session flash.' % (no_,)
+ msg = f'msg `{no_}` found in session flash.'
pytest.fail(safe_str(msg))
else:
if msg not in message_text:
- fail_msg = u'msg `%s` not found in session ' \
- u'flash: got `%s` (type:%s) instead' % (
- msg, message_text, type(message_text))
+ fail_msg = f'msg `{msg}` not found in ' \
+ f'session flash: got `{message_text}` (type:{type(message_text)}) instead'
pytest.fail(safe_str(fail_msg))
if category:
@@ -235,7 +232,7 @@ def no_newline_id_generator(test_name):
Generates a test name without spaces or newlines characters. Used for
nicer output of progress of test
"""
- org_name = test_name
+
test_name = safe_str(test_name)\
.replace('\n', '_N') \
.replace('\r', '_N') \
@@ -246,7 +243,6 @@ def no_newline_id_generator(test_name):
def route_path_generator(url_defs, name, params=None, **kwargs):
- import urllib.request, urllib.parse, urllib.error
base_url = url_defs[name].format(**kwargs)
diff --git a/rhodecode/tests/fixture_mods/fixture_pyramid.py b/rhodecode/tests/fixture_mods/fixture_pyramid.py
--- a/rhodecode/tests/fixture_mods/fixture_pyramid.py
+++ b/rhodecode/tests/fixture_mods/fixture_pyramid.py
@@ -143,7 +143,7 @@ def get_available_port(min_port=40000, m
@pytest.fixture(scope='session')
def rcserver_port(request):
port = get_available_port()
- print('Using rhodecode port {}'.format(port))
+ print(f'Using rhodecode port {port}')
return port
@@ -152,7 +152,7 @@ def vcsserver_port(request):
port = request.config.getoption('--vcsserver-port')
if port is None:
port = get_available_port()
- print('Using vcsserver port {}'.format(port))
+ print(f'Using vcsserver port {port}')
return port
diff --git a/rhodecode/tests/fixture_mods/fixture_utils.py b/rhodecode/tests/fixture_mods/fixture_utils.py
--- a/rhodecode/tests/fixture_mods/fixture_utils.py
+++ b/rhodecode/tests/fixture_mods/fixture_utils.py
@@ -20,7 +20,6 @@
import collections
import datetime
-import hashlib
import os
import re
import pprint
@@ -31,6 +30,7 @@ import time
import uuid
import dateutil.tz
import logging
+import functools
import mock
import pyramid.testing
@@ -40,7 +40,7 @@ import requests
import pyramid.paster
import rhodecode
-from rhodecode.lib.utils2 import AttributeDict
+import rhodecode.lib
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.comment import CommentsModel
from rhodecode.model.db import (
@@ -57,6 +57,7 @@ from rhodecode.model.integration import
from rhodecode.integrations import integration_type_registry
from rhodecode.integrations.types.base import IntegrationTypeBase
from rhodecode.lib.utils import repo2db_mapper
+from rhodecode.lib.hash_utils import sha1_safe
from rhodecode.lib.vcs.backends import get_backend
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.tests import (
@@ -74,6 +75,7 @@ def cmp(a, b):
# backport cmp from python2 so we can still use it in the custom code in this module
return (a > b) - (a < b)
+
@pytest.fixture(scope='session', autouse=True)
def activate_example_rcextensions(request):
"""
@@ -1265,7 +1267,7 @@ class UserUtility(object):
return cmp(second_group_parts, first_group_parts)
sorted_repo_group_ids = sorted(
- self.repo_group_ids, cmp=_repo_group_compare)
+ self.repo_group_ids, key=functools.cmp_to_key(_repo_group_compare))
for repo_group_id in sorted_repo_group_ids:
self.fixture.destroy_repo_group(repo_group_id)
@@ -1290,7 +1292,7 @@ class UserUtility(object):
return cmp(second_group_parts, first_group_parts)
sorted_user_group_ids = sorted(
- self.user_group_ids, cmp=_user_group_compare)
+ self.user_group_ids, key=functools.cmp_to_key(_user_group_compare))
for user_group_id in sorted_user_group_ids:
self.fixture.destroy_user_group(user_group_id)
@@ -1429,8 +1431,7 @@ class SettingsUtility(object):
def create_repo_rhodecode_ui(
self, repo, section, value, key=None, active=True, cleanup=True):
- key = key or hashlib.sha1(
- '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
+ key = key or sha1_safe(f'{section}{value}{repo.repo_id}')
setting = RepoRhodeCodeUi()
setting.repository_id = repo.repo_id
@@ -1447,7 +1448,7 @@ class SettingsUtility(object):
def create_rhodecode_ui(
self, section, value, key=None, active=True, cleanup=True):
- key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
+ key = key or sha1_safe(f'{section}{value}')
setting = RhodeCodeUi()
setting.ui_section = section
diff --git a/rhodecode/tests/lib/middleware/test_vcs_unavailable.py b/rhodecode/tests/lib/middleware/test_vcs_unavailable.py
--- a/rhodecode/tests/lib/middleware/test_vcs_unavailable.py
+++ b/rhodecode/tests/lib/middleware/test_vcs_unavailable.py
@@ -27,7 +27,7 @@ def test_vcs_available_returns_summary_p
url = '/{repo_name}'.format(repo_name=backend.repo.repo_name)
response = app.get(url)
assert response.status_code == 200
- assert 'Summary' in response.body
+ assert 'Summary' in response.text
@pytest.mark.usefixtures('autologin_user', 'app')
@@ -48,4 +48,4 @@ def test_vcs_unavailable_returns_vcs_err
response = app.get(url, expect_errors=True)
assert response.status_code == 502
- assert 'Could not connect to VCS Server' in response.body
+ assert 'Could not connect to VCS Server' in response.text
diff --git a/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py b/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py
--- a/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py
+++ b/rhodecode/tests/lib/middleware/utils/test_wsgi_app_caller_client.py
@@ -96,4 +96,4 @@ def test_remote_app_caller():
('Content-Type', 'text/plain'),
('Content-Length', '7'),
])
- assert response.body == 'content'
+ assert response.text == 'content'
diff --git a/rhodecode/tests/lib/test_encrypt.py b/rhodecode/tests/lib/test_encrypt.py
--- a/rhodecode/tests/lib/test_encrypt.py
+++ b/rhodecode/tests/lib/test_encrypt.py
@@ -21,113 +21,116 @@
import pytest
from rhodecode.lib.encrypt import (
- AESCipher, SignatureVerificationError, InvalidDecryptedValue)
-from rhodecode.lib.encrypt2 import (Encryptor, InvalidToken)
+ AESCipher, InvalidDecryptedValue)
+from rhodecode.lib import enc_utils
+from rhodecode.lib.str_utils import safe_str
+from rhodecode.lib.exceptions import SignatureVerificationError
+
+
+@pytest.mark.parametrize(
+ "algo", ['fernet', 'aes'],
+)
+@pytest.mark.parametrize(
+ "key, text",
+ [
+ (b'a', 'short'),
+ (b'a' * 64, 'too long(trimmed to 32)'),
+ (b'a' * 32, 'just enough'),
+ ('ąćęćę', 'non asci'),
+ ('$asa$asa', 'special $ used'),
+ ]
+)
+@pytest.mark.parametrize(
+ "strict_mode", [True, False],
+)
+def test_common_encryption_module(algo, key, text, strict_mode):
+ encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
+ decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=strict_mode)
+ assert text == safe_str(decrypted)
+
+
+@pytest.mark.parametrize(
+ "algo", ['fernet', 'aes'],
+)
+def test_encryption_with_bad_key(algo):
+ key = b'secretstring'
+ text = b'ihatemysql'
+
+ encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
+ decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=b'different-key', strict_mode=False)
+
+ assert decrypted[:22] == '