##// END OF EJS Templates
tests: multiple tests cases fixes for python3
super-admin -
r4994:4e9283a1 default
parent child Browse files
Show More
@@ -51,7 +51,7 b' class TestMyAccountSimpleViews(TestContr'
51 Repository.user == User.get_by_username(
51 Repository.user == User.get_by_username(
52 TEST_USER_ADMIN_LOGIN)).all()
52 TEST_USER_ADMIN_LOGIN)).all()
53 for repo in repos:
53 for repo in repos:
54 response.mustcontain('"name_raw": "%s"' % repo.repo_name)
54 response.mustcontain(f'"name_raw":"{repo.repo_name}"')
55
55
56 def test_my_account_my_watched(self, autologin_user):
56 def test_my_account_my_watched(self, autologin_user):
57 response = self.app.get(route_path('my_account_watched'))
57 response = self.app.get(route_path('my_account_watched'))
@@ -60,8 +60,7 b' class TestMyAccountSimpleViews(TestContr'
60 UserFollowing.user == User.get_by_username(
60 UserFollowing.user == User.get_by_username(
61 TEST_USER_ADMIN_LOGIN)).all()
61 TEST_USER_ADMIN_LOGIN)).all()
62 for repo in repos:
62 for repo in repos:
63 response.mustcontain(
63 response.mustcontain(f'"name_raw":"{repo.follows_repository.repo_name}"')
64 '"name_raw": "%s"' % repo.follows_repository.repo_name)
65
64
66 def test_my_account_perms(self, autologin_user):
65 def test_my_account_perms(self, autologin_user):
67 response = self.app.get(route_path('my_account_perms'))
66 response = self.app.get(route_path('my_account_perms'))
@@ -107,7 +107,7 b' class TestChangelogController(TestContro'
107 assert expected_url in response.location
107 assert expected_url in response.location
108 response = response.follow()
108 response = response.follow()
109 expected_warning = 'Branch {} is not found.'.format(branch)
109 expected_warning = 'Branch {} is not found.'.format(branch)
110 assert expected_warning in response.body
110 assert expected_warning in response.text
111
111
112 @pytest.mark.xfail_backends("svn", reason="Depends on branch support")
112 @pytest.mark.xfail_backends("svn", reason="Depends on branch support")
113 def test_changelog_filtered_by_branch_with_merges(
113 def test_changelog_filtered_by_branch_with_merges(
@@ -59,20 +59,20 b' class TestRepoCommitView(object):'
59 response = self.app.get(route_path(
59 response = self.app.get(route_path(
60 'repo_commit_raw',
60 'repo_commit_raw',
61 repo_name=backend.repo_name, commit_id=commit_id))
61 repo_name=backend.repo_name, commit_id=commit_id))
62 assert response.body == self.diffs[backend.alias]
62 assert response.text == self.diffs[backend.alias]
63
63
64 def test_show_raw_patch(self, backend):
64 def test_show_raw_patch(self, backend):
65 response = self.app.get(route_path(
65 response = self.app.get(route_path(
66 'repo_commit_patch', repo_name=backend.repo_name,
66 'repo_commit_patch', repo_name=backend.repo_name,
67 commit_id=self.commit_id[backend.alias]))
67 commit_id=self.commit_id[backend.alias]))
68 assert response.body == self.patches[backend.alias]
68 assert response.text == self.patches[backend.alias]
69
69
70 def test_commit_download(self, backend):
70 def test_commit_download(self, backend):
71 response = self.app.get(route_path(
71 response = self.app.get(route_path(
72 'repo_commit_download',
72 'repo_commit_download',
73 repo_name=backend.repo_name,
73 repo_name=backend.repo_name,
74 commit_id=self.commit_id[backend.alias]))
74 commit_id=self.commit_id[backend.alias]))
75 assert response.body == self.diffs[backend.alias]
75 assert response.text == self.diffs[backend.alias]
76
76
77 def test_single_commit_page_different_ops(self, backend):
77 def test_single_commit_page_different_ops(self, backend):
78 commit_id = {
78 commit_id = {
@@ -600,7 +600,7 b' class TestCompareControllerSvn(object):'
600 status=200)
600 status=200)
601
601
602 # It should show commits
602 # It should show commits
603 assert 'No commits in this compare' not in response.body
603 assert 'No commits in this compare' not in response.text
604
604
605 # Should find only one file changed when comparing those two tags
605 # Should find only one file changed when comparing those two tags
606 response.mustcontain('example.py')
606 response.mustcontain('example.py')
@@ -398,7 +398,7 b' class TestFilesViews(object):'
398 repo_name=backend.repo_name,
398 repo_name=backend.repo_name,
399 commit_id=commit.raw_id, f_path='README.rst'),
399 commit_id=commit.raw_id, f_path='README.rst'),
400 extra_environ=xhr_header)
400 extra_environ=xhr_header)
401 assert response.body == ''
401 assert response.text == ''
402
402
403 def test_nodetree_wrong_path(self, backend, xhr_header):
403 def test_nodetree_wrong_path(self, backend, xhr_header):
404 commit = backend.repo.get_commit(commit_idx=173)
404 commit = backend.repo.get_commit(commit_idx=173)
@@ -410,7 +410,7 b' class TestFilesViews(object):'
410
410
411 err = 'error: There is no file nor ' \
411 err = 'error: There is no file nor ' \
412 'directory at the given path'
412 'directory at the given path'
413 assert err in response.body
413 assert err in response.text
414
414
415 def test_nodetree_missing_xhr(self, backend):
415 def test_nodetree_missing_xhr(self, backend):
416 self.app.get(
416 self.app.get(
@@ -83,7 +83,7 b' class TestRepoIssueTracker(object):'
83 repo_name=backend.repo.repo_name),
83 repo_name=backend.repo.repo_name),
84 extra_environ=xhr_header, params=data)
84 extra_environ=xhr_header, params=data)
85
85
86 assert response.body == \
86 assert response.text == \
87 'example of <a class="tooltip issue-tracker-link" href="http://url" title="description">prefix</a> replacement'
87 'example of <a class="tooltip issue-tracker-link" href="http://url" title="description">prefix</a> replacement'
88
88
89 @request.addfinalizer
89 @request.addfinalizer
@@ -1644,7 +1644,7 b' class TestPullrequestsControllerDelete(o'
1644 params={'csrf_token': csrf_token},
1644 params={'csrf_token': csrf_token},
1645 status=200
1645 status=200
1646 )
1646 )
1647 assert response.body == 'true'
1647 assert response.text == 'true'
1648
1648
1649 @pytest.mark.parametrize('url_type', [
1649 @pytest.mark.parametrize('url_type', [
1650 'pullrequest_new',
1650 'pullrequest_new',
@@ -22,11 +22,9 b' import os'
22 import time
22 import time
23 import logging
23 import logging
24 import datetime
24 import datetime
25 import hashlib
26 import tempfile
25 import tempfile
27 from os.path import join as jn
26 from os.path import join as jn
28
27 import urllib.parse
29 from tempfile import _RandomNameSequence
30
28
31 import pytest
29 import pytest
32
30
@@ -34,8 +32,8 b' from rhodecode.model.db import User'
34 from rhodecode.lib import auth
32 from rhodecode.lib import auth
35 from rhodecode.lib import helpers as h
33 from rhodecode.lib import helpers as h
36 from rhodecode.lib.helpers import flash
34 from rhodecode.lib.helpers import flash
37 from rhodecode.lib.utils2 import safe_str
35 from rhodecode.lib.str_utils import safe_str
38
36 from rhodecode.lib.hash_utils import sha1_safe
39
37
40 log = logging.getLogger(__name__)
38 log = logging.getLogger(__name__)
41
39
@@ -57,7 +55,7 b' log = logging.getLogger(__name__)'
57 # SOME GLOBALS FOR TESTS
55 # SOME GLOBALS FOR TESTS
58 TEST_DIR = tempfile.gettempdir()
56 TEST_DIR = tempfile.gettempdir()
59
57
60 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(_RandomNameSequence())))
58 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(tempfile._RandomNameSequence())))
61 TEST_USER_ADMIN_LOGIN = 'test_admin'
59 TEST_USER_ADMIN_LOGIN = 'test_admin'
62 TEST_USER_ADMIN_PASS = 'test12'
60 TEST_USER_ADMIN_PASS = 'test12'
63 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
61 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
@@ -85,12 +83,12 b" SCM_TESTS = ['hg', 'git']"
85 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
83 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
86
84
87 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
85 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
88 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
86 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcsgitclone{uniq_suffix}')
89 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
87 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, f'vcsgitpull{uniq_suffix}')
90
88
91 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
89 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
92 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
90 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcshgclone{uniq_suffix}')
93 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
91 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, f'vcshgpull{uniq_suffix}')
94
92
95 TEST_REPO_PREFIX = 'vcs-test'
93 TEST_REPO_PREFIX = 'vcs-test'
96
94
@@ -111,7 +109,7 b' def get_new_dir(title):'
111 name_parts = [TEST_REPO_PREFIX]
109 name_parts = [TEST_REPO_PREFIX]
112 if title:
110 if title:
113 name_parts.append(title)
111 name_parts.append(title)
114 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
112 hex_str = sha1_safe(f'{os.getpid()} {time.time()}')
115 name_parts.append(hex_str)
113 name_parts.append(hex_str)
116 name = '-'.join(name_parts)
114 name = '-'.join(name_parts)
117 path = os.path.join(TEST_DIR, name)
115 path = os.path.join(TEST_DIR, name)
@@ -154,8 +152,8 b' def login_user_session('
154 response = app.post(
152 response = app.post(
155 h.route_path('login'),
153 h.route_path('login'),
156 {'username': username, 'password': password})
154 {'username': username, 'password': password})
157 if 'invalid user name' in response.body:
155 if 'invalid user name' in response.text:
158 pytest.fail('could not login using %s %s' % (username, password))
156 pytest.fail(f'could not login using {username} {password}')
159
157
160 assert response.status == '302 Found'
158 assert response.status == '302 Found'
161 response = response.follow()
159 response = response.follow()
@@ -203,9 +201,9 b' def assert_session_flash(response, msg=N'
203 msg = _eval_if_lazy(msg)
201 msg = _eval_if_lazy(msg)
204
202
205 if no_:
203 if no_:
206 error_msg = 'unable to detect no_ message `%s` in empty flash list' % no_
204 error_msg = f'unable to detect no_ message `{no_}` in empty flash list'
207 else:
205 else:
208 error_msg = 'unable to find message `%s` in empty flash list' % msg
206 error_msg = f'unable to find message `{msg}` in empty flash list'
209 assert messages, error_msg
207 assert messages, error_msg
210 message = messages[0]
208 message = messages[0]
211
209
@@ -213,13 +211,12 b' def assert_session_flash(response, msg=N'
213
211
214 if no_:
212 if no_:
215 if no_ in message_text:
213 if no_ in message_text:
216 msg = u'msg `%s` found in session flash.' % (no_,)
214 msg = f'msg `{no_}` found in session flash.'
217 pytest.fail(safe_str(msg))
215 pytest.fail(safe_str(msg))
218 else:
216 else:
219 if msg not in message_text:
217 if msg not in message_text:
220 fail_msg = u'msg `%s` not found in session ' \
218 fail_msg = f'msg `{msg}` not found in ' \
221 u'flash: got `%s` (type:%s) instead' % (
219 f'session flash: got `{message_text}` (type:{type(message_text)}) instead'
222 msg, message_text, type(message_text))
223
220
224 pytest.fail(safe_str(fail_msg))
221 pytest.fail(safe_str(fail_msg))
225 if category:
222 if category:
@@ -235,7 +232,7 b' def no_newline_id_generator(test_name):'
235 Generates a test name without spaces or newlines characters. Used for
232 Generates a test name without spaces or newlines characters. Used for
236 nicer output of progress of test
233 nicer output of progress of test
237 """
234 """
238 org_name = test_name
235
239 test_name = safe_str(test_name)\
236 test_name = safe_str(test_name)\
240 .replace('\n', '_N') \
237 .replace('\n', '_N') \
241 .replace('\r', '_N') \
238 .replace('\r', '_N') \
@@ -246,7 +243,6 b' def no_newline_id_generator(test_name):'
246
243
247
244
248 def route_path_generator(url_defs, name, params=None, **kwargs):
245 def route_path_generator(url_defs, name, params=None, **kwargs):
249 import urllib.request, urllib.parse, urllib.error
250
246
251 base_url = url_defs[name].format(**kwargs)
247 base_url = url_defs[name].format(**kwargs)
252
248
@@ -143,7 +143,7 b' def get_available_port(min_port=40000, m'
143 @pytest.fixture(scope='session')
143 @pytest.fixture(scope='session')
144 def rcserver_port(request):
144 def rcserver_port(request):
145 port = get_available_port()
145 port = get_available_port()
146 print('Using rhodecode port {}'.format(port))
146 print(f'Using rhodecode port {port}')
147 return port
147 return port
148
148
149
149
@@ -152,7 +152,7 b' def vcsserver_port(request):'
152 port = request.config.getoption('--vcsserver-port')
152 port = request.config.getoption('--vcsserver-port')
153 if port is None:
153 if port is None:
154 port = get_available_port()
154 port = get_available_port()
155 print('Using vcsserver port {}'.format(port))
155 print(f'Using vcsserver port {port}')
156 return port
156 return port
157
157
158
158
@@ -20,7 +20,6 b''
20
20
21 import collections
21 import collections
22 import datetime
22 import datetime
23 import hashlib
24 import os
23 import os
25 import re
24 import re
26 import pprint
25 import pprint
@@ -31,6 +30,7 b' import time'
31 import uuid
30 import uuid
32 import dateutil.tz
31 import dateutil.tz
33 import logging
32 import logging
33 import functools
34
34
35 import mock
35 import mock
36 import pyramid.testing
36 import pyramid.testing
@@ -40,7 +40,7 b' import requests'
40 import pyramid.paster
40 import pyramid.paster
41
41
42 import rhodecode
42 import rhodecode
43 from rhodecode.lib.utils2 import AttributeDict
43 import rhodecode.lib
44 from rhodecode.model.changeset_status import ChangesetStatusModel
44 from rhodecode.model.changeset_status import ChangesetStatusModel
45 from rhodecode.model.comment import CommentsModel
45 from rhodecode.model.comment import CommentsModel
46 from rhodecode.model.db import (
46 from rhodecode.model.db import (
@@ -57,6 +57,7 b' from rhodecode.model.integration import '
57 from rhodecode.integrations import integration_type_registry
57 from rhodecode.integrations import integration_type_registry
58 from rhodecode.integrations.types.base import IntegrationTypeBase
58 from rhodecode.integrations.types.base import IntegrationTypeBase
59 from rhodecode.lib.utils import repo2db_mapper
59 from rhodecode.lib.utils import repo2db_mapper
60 from rhodecode.lib.hash_utils import sha1_safe
60 from rhodecode.lib.vcs.backends import get_backend
61 from rhodecode.lib.vcs.backends import get_backend
61 from rhodecode.lib.vcs.nodes import FileNode
62 from rhodecode.lib.vcs.nodes import FileNode
62 from rhodecode.tests import (
63 from rhodecode.tests import (
@@ -74,6 +75,7 b' def cmp(a, b):'
74 # backport cmp from python2 so we can still use it in the custom code in this module
75 # backport cmp from python2 so we can still use it in the custom code in this module
75 return (a > b) - (a < b)
76 return (a > b) - (a < b)
76
77
78
77 @pytest.fixture(scope='session', autouse=True)
79 @pytest.fixture(scope='session', autouse=True)
78 def activate_example_rcextensions(request):
80 def activate_example_rcextensions(request):
79 """
81 """
@@ -1265,7 +1267,7 b' class UserUtility(object):'
1265 return cmp(second_group_parts, first_group_parts)
1267 return cmp(second_group_parts, first_group_parts)
1266
1268
1267 sorted_repo_group_ids = sorted(
1269 sorted_repo_group_ids = sorted(
1268 self.repo_group_ids, cmp=_repo_group_compare)
1270 self.repo_group_ids, key=functools.cmp_to_key(_repo_group_compare))
1269 for repo_group_id in sorted_repo_group_ids:
1271 for repo_group_id in sorted_repo_group_ids:
1270 self.fixture.destroy_repo_group(repo_group_id)
1272 self.fixture.destroy_repo_group(repo_group_id)
1271
1273
@@ -1290,7 +1292,7 b' class UserUtility(object):'
1290 return cmp(second_group_parts, first_group_parts)
1292 return cmp(second_group_parts, first_group_parts)
1291
1293
1292 sorted_user_group_ids = sorted(
1294 sorted_user_group_ids = sorted(
1293 self.user_group_ids, cmp=_user_group_compare)
1295 self.user_group_ids, key=functools.cmp_to_key(_user_group_compare))
1294 for user_group_id in sorted_user_group_ids:
1296 for user_group_id in sorted_user_group_ids:
1295 self.fixture.destroy_user_group(user_group_id)
1297 self.fixture.destroy_user_group(user_group_id)
1296
1298
@@ -1429,8 +1431,7 b' class SettingsUtility(object):'
1429
1431
1430 def create_repo_rhodecode_ui(
1432 def create_repo_rhodecode_ui(
1431 self, repo, section, value, key=None, active=True, cleanup=True):
1433 self, repo, section, value, key=None, active=True, cleanup=True):
1432 key = key or hashlib.sha1(
1434 key = key or sha1_safe(f'{section}{value}{repo.repo_id}')
1433 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1434
1435
1435 setting = RepoRhodeCodeUi()
1436 setting = RepoRhodeCodeUi()
1436 setting.repository_id = repo.repo_id
1437 setting.repository_id = repo.repo_id
@@ -1447,7 +1448,7 b' class SettingsUtility(object):'
1447
1448
1448 def create_rhodecode_ui(
1449 def create_rhodecode_ui(
1449 self, section, value, key=None, active=True, cleanup=True):
1450 self, section, value, key=None, active=True, cleanup=True):
1450 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1451 key = key or sha1_safe(f'{section}{value}')
1451
1452
1452 setting = RhodeCodeUi()
1453 setting = RhodeCodeUi()
1453 setting.ui_section = section
1454 setting.ui_section = section
@@ -27,7 +27,7 b' def test_vcs_available_returns_summary_p'
27 url = '/{repo_name}'.format(repo_name=backend.repo.repo_name)
27 url = '/{repo_name}'.format(repo_name=backend.repo.repo_name)
28 response = app.get(url)
28 response = app.get(url)
29 assert response.status_code == 200
29 assert response.status_code == 200
30 assert 'Summary' in response.body
30 assert 'Summary' in response.text
31
31
32
32
33 @pytest.mark.usefixtures('autologin_user', 'app')
33 @pytest.mark.usefixtures('autologin_user', 'app')
@@ -48,4 +48,4 b' def test_vcs_unavailable_returns_vcs_err'
48 response = app.get(url, expect_errors=True)
48 response = app.get(url, expect_errors=True)
49
49
50 assert response.status_code == 502
50 assert response.status_code == 502
51 assert 'Could not connect to VCS Server' in response.body
51 assert 'Could not connect to VCS Server' in response.text
@@ -96,4 +96,4 b' def test_remote_app_caller():'
96 ('Content-Type', 'text/plain'),
96 ('Content-Type', 'text/plain'),
97 ('Content-Length', '7'),
97 ('Content-Length', '7'),
98 ])
98 ])
99 assert response.body == 'content'
99 assert response.text == 'content'
@@ -21,113 +21,116 b''
21 import pytest
21 import pytest
22
22
23 from rhodecode.lib.encrypt import (
23 from rhodecode.lib.encrypt import (
24 AESCipher, SignatureVerificationError, InvalidDecryptedValue)
24 AESCipher, InvalidDecryptedValue)
25 from rhodecode.lib.encrypt2 import (Encryptor, InvalidToken)
25 from rhodecode.lib import enc_utils
26 from rhodecode.lib.str_utils import safe_str
27 from rhodecode.lib.exceptions import SignatureVerificationError
26
28
27
29
28 class TestEncryptModule(object):
30 @pytest.mark.parametrize(
29
31 "algo", ['fernet', 'aes'],
32 )
30 @pytest.mark.parametrize(
33 @pytest.mark.parametrize(
31 "key, text",
34 "key, text",
32 [
35 [
33 ('a', 'short'),
36 (b'a', 'short'),
34 ('a'*64, 'too long(trimmed to 32)'),
37 (b'a' * 64, 'too long(trimmed to 32)'),
35 ('a'*32, 'just enough'),
38 (b'a' * 32, 'just enough'),
36 ('ąćęćę', 'non asci'),
39 ('ąćęćę', 'non asci'),
37 ('$asa$asa', 'special $ used'),
40 ('$asa$asa', 'special $ used'),
38 ]
41 ]
39 )
42 )
40 def test_encryption(self, key, text):
43 @pytest.mark.parametrize(
41 enc = AESCipher(key).encrypt(text)
44 "strict_mode", [True, False],
42 assert AESCipher(key).decrypt(enc) == text
45 )
46 def test_common_encryption_module(algo, key, text, strict_mode):
47 encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
48 decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=strict_mode)
49 assert text == safe_str(decrypted)
50
51
52 @pytest.mark.parametrize(
53 "algo", ['fernet', 'aes'],
54 )
55 def test_encryption_with_bad_key(algo):
56 key = b'secretstring'
57 text = b'ihatemysql'
58
59 encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
60 decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=b'different-key', strict_mode=False)
61
62 assert decrypted[:22] == '<InvalidDecryptedValue'
63
64
65 @pytest.mark.parametrize(
66 "algo", ['fernet', 'aes'],
67 )
68 def test_encryption_with_bad_key_raises(algo):
69 key = b'secretstring'
70 text = b'ihatemysql'
71 encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
72
73 with pytest.raises(SignatureVerificationError) as e:
74 enc_utils.decrypt_value(encrypted, algo=algo, enc_key=b'different-key', strict_mode=True)
75
76 assert 'InvalidDecryptedValue' in str(e)
43
77
44 def test_encryption_with_hmac(self):
78
45 key = 'secret'
79 @pytest.mark.parametrize(
46 text = 'ihatemysql'
80 "algo", ['fernet', 'aes'],
81 )
82 def test_encryption_with_bad_format_data(algo):
83 key = b'secret'
84 text = b'ihatemysql'
85 encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
86 encrypted = b'$xyz' + encrypted[3:]
87
88 with pytest.raises(ValueError) as e:
89 enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=True)
90
91 assert 'Encrypted Data has invalid format' in str(e)
92
93
94 @pytest.mark.parametrize(
95 "algo", ['fernet', 'aes'],
96 )
97 def test_encryption_with_bad_data(algo):
98 key = b'secret'
99 text = b'ihatemysql'
100 encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key)
101 encrypted = encrypted[:-5]
102
103 with pytest.raises(SignatureVerificationError) as e:
104 enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=True)
105
106 assert 'SignatureVerificationError' in str(e)
107
108
109 def test_encryption_with_hmac():
110 key = b'secret'
111 text = b'ihatemysql'
47 enc = AESCipher(key, hmac=True).encrypt(text)
112 enc = AESCipher(key, hmac=True).encrypt(text)
48 assert AESCipher(key, hmac=True).decrypt(enc) == text
113 assert AESCipher(key, hmac=True).decrypt(enc) == text
49
114
50 def test_encryption_with_hmac_with_bad_key(self):
51 key = 'secretstring'
52 text = 'ihatemysql'
53 enc = AESCipher(key, hmac=True).encrypt(text)
54
115
55 with pytest.raises(SignatureVerificationError) as e:
116 def test_encryption_with_hmac_with_bad_data():
56 assert AESCipher('differentsecret', hmac=True).decrypt(enc) == ''
117 key = b'secret'
57
118 text = b'ihatemysql'
58 assert 'Encryption signature verification failed' in str(e)
59
60 def test_encryption_with_hmac_with_bad_data(self):
61 key = 'secret'
62 text = 'ihatemysql'
63 enc = AESCipher(key, hmac=True).encrypt(text)
119 enc = AESCipher(key, hmac=True).encrypt(text)
64 enc = 'xyz' + enc[3:]
120 enc = b'xyz' + enc[3:]
65 with pytest.raises(SignatureVerificationError) as e:
121 with pytest.raises(SignatureVerificationError) as e:
66 assert AESCipher(key, hmac=True).decrypt(enc) == text
122 assert AESCipher(key, hmac=True).decrypt(enc, safe=False) == text
67
68 assert 'Encryption signature verification failed' in str(e)
69
123
70 def test_encryption_with_hmac_with_bad_key_not_strict(self):
124 assert 'SignatureVerificationError' in str(e)
71 key = 'secretstring'
72 text = 'ihatemysql'
73 enc = AESCipher(key, hmac=True).encrypt(text)
74
75 assert isinstance(AESCipher(
76 'differentsecret', hmac=True, strict_verification=False
77 ).decrypt(enc), InvalidDecryptedValue)
78
125
79
126
80 class TestEncryptModule2(object):
127 def test_encryption_with_hmac_with_bad_key_not_strict():
81
128 key = b'secretstring'
82 @pytest.mark.parametrize(
129 text = b'ihatemysql'
83 "key, text",
130 enc = AESCipher(key, hmac=True).encrypt(text)
84 [
85 ('a', 'short'),
86 ('a'*64, 'too long(trimmed to 32)'),
87 ('a'*32, 'just enough'),
88 ('ąćęćę', 'non asci'),
89 ('$asa$asa', 'special $ used'),
90 ]
91 )
92 def test_encryption(self, key, text):
93 enc = Encryptor(key).encrypt(text)
94 assert Encryptor(key).decrypt(enc) == text
95
96 def test_encryption_with_bad_key(self):
97 key = 'secretstring'
98 text = 'ihatemysql'
99 enc = Encryptor(key).encrypt(text)
100
101 assert Encryptor('differentsecret').decrypt(enc) == ''
102
131
103 def test_encryption_with_bad_key_raises(self):
132 decrypted = AESCipher(
104 key = 'secretstring'
133 b'differentsecret', hmac=True, strict_verification=False
105 text = 'ihatemysql'
134 ).decrypt(enc)
106 enc = Encryptor(key).encrypt(text)
107
108 with pytest.raises(InvalidToken) as e:
109 Encryptor('differentsecret').decrypt(enc, safe=False)
110
111 assert 'InvalidToken' in str(e)
112
113 def test_encryption_with_bad_format_data(self):
114 key = 'secret'
115 text = 'ihatemysql'
116 enc = Encryptor(key).encrypt(text)
117 enc = '$xyz' + enc[3:]
118
135
119 with pytest.raises(ValueError) as e:
136 assert isinstance(decrypted, InvalidDecryptedValue)
120 Encryptor(key).decrypt(enc, safe=False)
121
122 assert 'Encrypted Data has invalid format' in str(e)
123
124 def test_encryption_with_bad_data(self):
125 key = 'secret'
126 text = 'ihatemysql'
127 enc = Encryptor(key).encrypt(text)
128 enc = enc[:-5]
129
130 with pytest.raises(InvalidToken) as e:
131 Encryptor(key).decrypt(enc, safe=False)
132
133 assert 'InvalidToken' in str(e)
@@ -439,7 +439,7 b' def main(argv):'
439 return 1
439 return 1
440
440
441 sizes = options.sizes.split(',')
441 sizes = options.sizes.split(',')
442 sizes = map(int, sizes)
442 sizes = list(map(int, sizes))
443
443
444 base_dir = options.dir
444 base_dir = options.dir
445 api_key = options.api_key
445 api_key = options.api_key
@@ -24,7 +24,9 b' import logging'
24 import os.path
24 import os.path
25 import subprocess
25 import subprocess
26 import tempfile
26 import tempfile
27 import urllib.request, urllib.error, urllib.parse
27 import urllib.request
28 import urllib.error
29 import urllib.parse
28 from lxml.html import fromstring, tostring
30 from lxml.html import fromstring, tostring
29 from lxml.cssselect import CSSSelector
31 from lxml.cssselect import CSSSelector
30 from urllib.parse import unquote_plus
32 from urllib.parse import unquote_plus
@@ -54,7 +56,7 b' log = logging.getLogger(__name__)'
54 class CustomTestResponse(TestResponse):
56 class CustomTestResponse(TestResponse):
55
57
56 def _save_output(self, out):
58 def _save_output(self, out):
57 f = tempfile.NamedTemporaryFile(delete=False, prefix='rc-test-', suffix='.html')
59 f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html')
58 f.write(out)
60 f.write(out)
59 return f.name
61 return f.name
60
62
@@ -82,7 +84,7 b' class CustomTestResponse(TestResponse):'
82 f = self._save_output(str(self))
84 f = self._save_output(str(self))
83
85
84 for s in strings:
86 for s in strings:
85 if not s in self:
87 if s not in self:
86 print_stderr("Actual response (no %r):" % s)
88 print_stderr("Actual response (no %r):" % s)
87 print_stderr("body output saved as `%s`" % f)
89 print_stderr("body output saved as `%s`" % f)
88 if print_body:
90 if print_body:
@@ -113,13 +115,24 b' class CustomTestResponse(TestResponse):'
113
115
114 class TestRequest(webob.BaseRequest):
116 class TestRequest(webob.BaseRequest):
115
117
116 # for py.test
118 # for py.test, so it doesn't try to run this tas by name starting with test...
117 disabled = True
119 disabled = True
118 ResponseClass = CustomTestResponse
120 ResponseClass = CustomTestResponse
119
121
120 def add_response_callback(self, callback):
122 def add_response_callback(self, callback):
121 pass
123 pass
122
124
125 @classmethod
126 def blank(cls, path, environ=None, base_url=None,
127 headers=None, POST=None, **kw):
128
129 if not path.isascii():
130 # our custom quote path if it contains non-ascii chars
131 path = urllib.parse.quote(path)
132
133 return super(TestRequest, cls).blank(
134 path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw)
135
123
136
124 class CustomTestApp(TestApp):
137 class CustomTestApp(TestApp):
125 """
138 """
@@ -260,6 +273,7 b' class AssertResponse(object):'
260 def element_equals_to(self, css_selector, expected_content):
273 def element_equals_to(self, css_selector, expected_content):
261 element = self.get_element(css_selector)
274 element = self.get_element(css_selector)
262 element_text = self._element_to_string(element)
275 element_text = self._element_to_string(element)
276
263 assert expected_content in element_text
277 assert expected_content in element_text
264
278
265 def element_contains(self, css_selector, expected_content):
279 def element_contains(self, css_selector, expected_content):
@@ -306,7 +320,7 b' class AssertResponse(object):'
306
320
307 def _element_to_string(self, element):
321 def _element_to_string(self, element):
308 fromstring, tostring, CSSSelector = self.get_imports()
322 fromstring, tostring, CSSSelector = self.get_imports()
309 return tostring(element)
323 return tostring(element, encoding='unicode')
310
324
311
325
312 class _Url(object):
326 class _Url(object):
@@ -33,6 +33,7 b' import logging'
33 import os
33 import os
34 import tempfile
34 import tempfile
35
35
36 from rhodecode.lib.str_utils import safe_str
36 from rhodecode.tests import GIT_REPO, HG_REPO
37 from rhodecode.tests import GIT_REPO, HG_REPO
37
38
38 DEBUG = True
39 DEBUG = True
@@ -68,6 +69,10 b' class Command(object):'
68 self.process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE,
69 self.process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE,
69 cwd=self.cwd, env=env)
70 cwd=self.cwd, env=env)
70 stdout, stderr = self.process.communicate()
71 stdout, stderr = self.process.communicate()
72
73 stdout = safe_str(stdout)
74 stderr = safe_str(stderr)
75
71 if DEBUG:
76 if DEBUG:
72 log.debug('STDOUT:%s', stdout)
77 log.debug('STDOUT:%s', stdout)
73 log.debug('STDERR:%s', stderr)
78 log.debug('STDERR:%s', stderr)
@@ -78,8 +83,9 b' class Command(object):'
78
83
79
84
80 def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
85 def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
81 git_ident = "git config user.name {} && git config user.email {}".format(
86 full_name = 'Marcin Kuźminski'
82 'Marcin Kuźminski', 'me@email.com')
87 email = 'me@email.com'
88 git_ident = f"git config user.name {full_name} && git config user.email {email}"
83 cwd = path = jn(dest)
89 cwd = path = jn(dest)
84
90
85 tags = tags or []
91 tags = tags or []
General Comments 0
You need to be logged in to leave comments. Login now