Show More
@@ -51,7 +51,7 b' class TestMyAccountSimpleViews(TestContr' | |||
|
51 | 51 | Repository.user == User.get_by_username( |
|
52 | 52 | TEST_USER_ADMIN_LOGIN)).all() |
|
53 | 53 | for repo in repos: |
|
54 |
response.mustcontain('"name_raw": |
|
|
54 | response.mustcontain(f'"name_raw":"{repo.repo_name}"') | |
|
55 | 55 | |
|
56 | 56 | def test_my_account_my_watched(self, autologin_user): |
|
57 | 57 | response = self.app.get(route_path('my_account_watched')) |
@@ -60,8 +60,7 b' class TestMyAccountSimpleViews(TestContr' | |||
|
60 | 60 | UserFollowing.user == User.get_by_username( |
|
61 | 61 | TEST_USER_ADMIN_LOGIN)).all() |
|
62 | 62 | for repo in repos: |
|
63 | response.mustcontain( | |
|
64 | '"name_raw": "%s"' % repo.follows_repository.repo_name) | |
|
63 | response.mustcontain(f'"name_raw":"{repo.follows_repository.repo_name}"') | |
|
65 | 64 | |
|
66 | 65 | def test_my_account_perms(self, autologin_user): |
|
67 | 66 | response = self.app.get(route_path('my_account_perms')) |
@@ -107,7 +107,7 b' class TestChangelogController(TestContro' | |||
|
107 | 107 | assert expected_url in response.location |
|
108 | 108 | response = response.follow() |
|
109 | 109 | expected_warning = 'Branch {} is not found.'.format(branch) |
|
110 |
assert expected_warning in response. |
|
|
110 | assert expected_warning in response.text | |
|
111 | 111 | |
|
112 | 112 | @pytest.mark.xfail_backends("svn", reason="Depends on branch support") |
|
113 | 113 | def test_changelog_filtered_by_branch_with_merges( |
@@ -59,20 +59,20 b' class TestRepoCommitView(object):' | |||
|
59 | 59 | response = self.app.get(route_path( |
|
60 | 60 | 'repo_commit_raw', |
|
61 | 61 | repo_name=backend.repo_name, commit_id=commit_id)) |
|
62 |
assert response. |
|
|
62 | assert response.text == self.diffs[backend.alias] | |
|
63 | 63 | |
|
64 | 64 | def test_show_raw_patch(self, backend): |
|
65 | 65 | response = self.app.get(route_path( |
|
66 | 66 | 'repo_commit_patch', repo_name=backend.repo_name, |
|
67 | 67 | commit_id=self.commit_id[backend.alias])) |
|
68 |
assert response. |
|
|
68 | assert response.text == self.patches[backend.alias] | |
|
69 | 69 | |
|
70 | 70 | def test_commit_download(self, backend): |
|
71 | 71 | response = self.app.get(route_path( |
|
72 | 72 | 'repo_commit_download', |
|
73 | 73 | repo_name=backend.repo_name, |
|
74 | 74 | commit_id=self.commit_id[backend.alias])) |
|
75 |
assert response. |
|
|
75 | assert response.text == self.diffs[backend.alias] | |
|
76 | 76 | |
|
77 | 77 | def test_single_commit_page_different_ops(self, backend): |
|
78 | 78 | commit_id = { |
@@ -600,7 +600,7 b' class TestCompareControllerSvn(object):' | |||
|
600 | 600 | status=200) |
|
601 | 601 | |
|
602 | 602 | # It should show commits |
|
603 |
assert 'No commits in this compare' not in response. |
|
|
603 | assert 'No commits in this compare' not in response.text | |
|
604 | 604 | |
|
605 | 605 | # Should find only one file changed when comparing those two tags |
|
606 | 606 | response.mustcontain('example.py') |
@@ -398,7 +398,7 b' class TestFilesViews(object):' | |||
|
398 | 398 | repo_name=backend.repo_name, |
|
399 | 399 | commit_id=commit.raw_id, f_path='README.rst'), |
|
400 | 400 | extra_environ=xhr_header) |
|
401 |
assert response. |
|
|
401 | assert response.text == '' | |
|
402 | 402 | |
|
403 | 403 | def test_nodetree_wrong_path(self, backend, xhr_header): |
|
404 | 404 | commit = backend.repo.get_commit(commit_idx=173) |
@@ -410,7 +410,7 b' class TestFilesViews(object):' | |||
|
410 | 410 | |
|
411 | 411 | err = 'error: There is no file nor ' \ |
|
412 | 412 | 'directory at the given path' |
|
413 |
assert err in response. |
|
|
413 | assert err in response.text | |
|
414 | 414 | |
|
415 | 415 | def test_nodetree_missing_xhr(self, backend): |
|
416 | 416 | self.app.get( |
@@ -83,7 +83,7 b' class TestRepoIssueTracker(object):' | |||
|
83 | 83 | repo_name=backend.repo.repo_name), |
|
84 | 84 | extra_environ=xhr_header, params=data) |
|
85 | 85 | |
|
86 |
assert response. |
|
|
86 | assert response.text == \ | |
|
87 | 87 | 'example of <a class="tooltip issue-tracker-link" href="http://url" title="description">prefix</a> replacement' |
|
88 | 88 | |
|
89 | 89 | @request.addfinalizer |
@@ -1644,7 +1644,7 b' class TestPullrequestsControllerDelete(o' | |||
|
1644 | 1644 | params={'csrf_token': csrf_token}, |
|
1645 | 1645 | status=200 |
|
1646 | 1646 | ) |
|
1647 |
assert response. |
|
|
1647 | assert response.text == 'true' | |
|
1648 | 1648 | |
|
1649 | 1649 | @pytest.mark.parametrize('url_type', [ |
|
1650 | 1650 | 'pullrequest_new', |
@@ -22,11 +22,9 b' import os' | |||
|
22 | 22 | import time |
|
23 | 23 | import logging |
|
24 | 24 | import datetime |
|
25 | import hashlib | |
|
26 | 25 | import tempfile |
|
27 | 26 | from os.path import join as jn |
|
28 | ||
|
29 | from tempfile import _RandomNameSequence | |
|
27 | import urllib.parse | |
|
30 | 28 | |
|
31 | 29 | import pytest |
|
32 | 30 | |
@@ -34,8 +32,8 b' from rhodecode.model.db import User' | |||
|
34 | 32 | from rhodecode.lib import auth |
|
35 | 33 | from rhodecode.lib import helpers as h |
|
36 | 34 | from rhodecode.lib.helpers import flash |
|
37 |
from rhodecode.lib.utils |
|
|
38 | ||
|
35 | from rhodecode.lib.str_utils import safe_str | |
|
36 | from rhodecode.lib.hash_utils import sha1_safe | |
|
39 | 37 | |
|
40 | 38 | log = logging.getLogger(__name__) |
|
41 | 39 | |
@@ -57,7 +55,7 b' log = logging.getLogger(__name__)' | |||
|
57 | 55 | # SOME GLOBALS FOR TESTS |
|
58 | 56 | TEST_DIR = tempfile.gettempdir() |
|
59 | 57 | |
|
60 | TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(_RandomNameSequence()))) | |
|
58 | TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_{}'.format(next(tempfile._RandomNameSequence()))) | |
|
61 | 59 | TEST_USER_ADMIN_LOGIN = 'test_admin' |
|
62 | 60 | TEST_USER_ADMIN_PASS = 'test12' |
|
63 | 61 | TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com' |
@@ -85,12 +83,12 b" SCM_TESTS = ['hg', 'git']" | |||
|
85 | 83 | uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple()))) |
|
86 | 84 | |
|
87 | 85 | TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO) |
|
88 |
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone |
|
|
89 |
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull |
|
|
86 | TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcsgitclone{uniq_suffix}') | |
|
87 | TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, f'vcsgitpull{uniq_suffix}') | |
|
90 | 88 | |
|
91 | 89 | TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO) |
|
92 |
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone |
|
|
93 |
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull |
|
|
90 | TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, f'vcshgclone{uniq_suffix}') | |
|
91 | TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, f'vcshgpull{uniq_suffix}') | |
|
94 | 92 | |
|
95 | 93 | TEST_REPO_PREFIX = 'vcs-test' |
|
96 | 94 | |
@@ -111,7 +109,7 b' def get_new_dir(title):' | |||
|
111 | 109 | name_parts = [TEST_REPO_PREFIX] |
|
112 | 110 | if title: |
|
113 | 111 | name_parts.append(title) |
|
114 |
hex_str = |
|
|
112 | hex_str = sha1_safe(f'{os.getpid()} {time.time()}') | |
|
115 | 113 | name_parts.append(hex_str) |
|
116 | 114 | name = '-'.join(name_parts) |
|
117 | 115 | path = os.path.join(TEST_DIR, name) |
@@ -154,8 +152,8 b' def login_user_session(' | |||
|
154 | 152 | response = app.post( |
|
155 | 153 | h.route_path('login'), |
|
156 | 154 | {'username': username, 'password': password}) |
|
157 |
if 'invalid user name' in response. |
|
|
158 |
pytest.fail('could not login using |
|
|
155 | if 'invalid user name' in response.text: | |
|
156 | pytest.fail(f'could not login using {username} {password}') | |
|
159 | 157 | |
|
160 | 158 | assert response.status == '302 Found' |
|
161 | 159 | response = response.follow() |
@@ -203,9 +201,9 b' def assert_session_flash(response, msg=N' | |||
|
203 | 201 | msg = _eval_if_lazy(msg) |
|
204 | 202 | |
|
205 | 203 | if no_: |
|
206 |
error_msg = 'unable to detect no_ message ` |
|
|
204 | error_msg = f'unable to detect no_ message `{no_}` in empty flash list' | |
|
207 | 205 | else: |
|
208 |
error_msg = 'unable to find message ` |
|
|
206 | error_msg = f'unable to find message `{msg}` in empty flash list' | |
|
209 | 207 | assert messages, error_msg |
|
210 | 208 | message = messages[0] |
|
211 | 209 | |
@@ -213,13 +211,12 b' def assert_session_flash(response, msg=N' | |||
|
213 | 211 | |
|
214 | 212 | if no_: |
|
215 | 213 | if no_ in message_text: |
|
216 |
msg = |
|
|
214 | msg = f'msg `{no_}` found in session flash.' | |
|
217 | 215 | pytest.fail(safe_str(msg)) |
|
218 | 216 | else: |
|
219 | 217 | if msg not in message_text: |
|
220 |
fail_msg = |
|
|
221 |
|
|
|
222 | msg, message_text, type(message_text)) | |
|
218 | fail_msg = f'msg `{msg}` not found in ' \ | |
|
219 | f'session flash: got `{message_text}` (type:{type(message_text)}) instead' | |
|
223 | 220 | |
|
224 | 221 | pytest.fail(safe_str(fail_msg)) |
|
225 | 222 | if category: |
@@ -235,7 +232,7 b' def no_newline_id_generator(test_name):' | |||
|
235 | 232 | Generates a test name without spaces or newlines characters. Used for |
|
236 | 233 | nicer output of progress of test |
|
237 | 234 | """ |
|
238 | org_name = test_name | |
|
235 | ||
|
239 | 236 | test_name = safe_str(test_name)\ |
|
240 | 237 | .replace('\n', '_N') \ |
|
241 | 238 | .replace('\r', '_N') \ |
@@ -246,7 +243,6 b' def no_newline_id_generator(test_name):' | |||
|
246 | 243 | |
|
247 | 244 | |
|
248 | 245 | def route_path_generator(url_defs, name, params=None, **kwargs): |
|
249 | import urllib.request, urllib.parse, urllib.error | |
|
250 | 246 | |
|
251 | 247 | base_url = url_defs[name].format(**kwargs) |
|
252 | 248 |
@@ -143,7 +143,7 b' def get_available_port(min_port=40000, m' | |||
|
143 | 143 | @pytest.fixture(scope='session') |
|
144 | 144 | def rcserver_port(request): |
|
145 | 145 | port = get_available_port() |
|
146 |
print('Using rhodecode port {}' |
|
|
146 | print(f'Using rhodecode port {port}') | |
|
147 | 147 | return port |
|
148 | 148 | |
|
149 | 149 | |
@@ -152,7 +152,7 b' def vcsserver_port(request):' | |||
|
152 | 152 | port = request.config.getoption('--vcsserver-port') |
|
153 | 153 | if port is None: |
|
154 | 154 | port = get_available_port() |
|
155 |
print('Using vcsserver port {}' |
|
|
155 | print(f'Using vcsserver port {port}') | |
|
156 | 156 | return port |
|
157 | 157 | |
|
158 | 158 |
@@ -20,7 +20,6 b'' | |||
|
20 | 20 | |
|
21 | 21 | import collections |
|
22 | 22 | import datetime |
|
23 | import hashlib | |
|
24 | 23 | import os |
|
25 | 24 | import re |
|
26 | 25 | import pprint |
@@ -31,6 +30,7 b' import time' | |||
|
31 | 30 | import uuid |
|
32 | 31 | import dateutil.tz |
|
33 | 32 | import logging |
|
33 | import functools | |
|
34 | 34 | |
|
35 | 35 | import mock |
|
36 | 36 | import pyramid.testing |
@@ -40,7 +40,7 b' import requests' | |||
|
40 | 40 | import pyramid.paster |
|
41 | 41 | |
|
42 | 42 | import rhodecode |
|
43 | from rhodecode.lib.utils2 import AttributeDict | |
|
43 | import rhodecode.lib | |
|
44 | 44 | from rhodecode.model.changeset_status import ChangesetStatusModel |
|
45 | 45 | from rhodecode.model.comment import CommentsModel |
|
46 | 46 | from rhodecode.model.db import ( |
@@ -57,6 +57,7 b' from rhodecode.model.integration import ' | |||
|
57 | 57 | from rhodecode.integrations import integration_type_registry |
|
58 | 58 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
59 | 59 | from rhodecode.lib.utils import repo2db_mapper |
|
60 | from rhodecode.lib.hash_utils import sha1_safe | |
|
60 | 61 | from rhodecode.lib.vcs.backends import get_backend |
|
61 | 62 | from rhodecode.lib.vcs.nodes import FileNode |
|
62 | 63 | from rhodecode.tests import ( |
@@ -74,6 +75,7 b' def cmp(a, b):' | |||
|
74 | 75 | # backport cmp from python2 so we can still use it in the custom code in this module |
|
75 | 76 | return (a > b) - (a < b) |
|
76 | 77 | |
|
78 | ||
|
77 | 79 | @pytest.fixture(scope='session', autouse=True) |
|
78 | 80 | def activate_example_rcextensions(request): |
|
79 | 81 | """ |
@@ -1265,7 +1267,7 b' class UserUtility(object):' | |||
|
1265 | 1267 | return cmp(second_group_parts, first_group_parts) |
|
1266 | 1268 | |
|
1267 | 1269 | sorted_repo_group_ids = sorted( |
|
1268 |
self.repo_group_ids, |
|
|
1270 | self.repo_group_ids, key=functools.cmp_to_key(_repo_group_compare)) | |
|
1269 | 1271 | for repo_group_id in sorted_repo_group_ids: |
|
1270 | 1272 | self.fixture.destroy_repo_group(repo_group_id) |
|
1271 | 1273 | |
@@ -1290,7 +1292,7 b' class UserUtility(object):' | |||
|
1290 | 1292 | return cmp(second_group_parts, first_group_parts) |
|
1291 | 1293 | |
|
1292 | 1294 | sorted_user_group_ids = sorted( |
|
1293 |
self.user_group_ids, |
|
|
1295 | self.user_group_ids, key=functools.cmp_to_key(_user_group_compare)) | |
|
1294 | 1296 | for user_group_id in sorted_user_group_ids: |
|
1295 | 1297 | self.fixture.destroy_user_group(user_group_id) |
|
1296 | 1298 | |
@@ -1429,8 +1431,7 b' class SettingsUtility(object):' | |||
|
1429 | 1431 | |
|
1430 | 1432 | def create_repo_rhodecode_ui( |
|
1431 | 1433 | self, repo, section, value, key=None, active=True, cleanup=True): |
|
1432 | key = key or hashlib.sha1( | |
|
1433 | '{}{}{}'.format(section, value, repo.repo_id)).hexdigest() | |
|
1434 | key = key or sha1_safe(f'{section}{value}{repo.repo_id}') | |
|
1434 | 1435 | |
|
1435 | 1436 | setting = RepoRhodeCodeUi() |
|
1436 | 1437 | setting.repository_id = repo.repo_id |
@@ -1447,7 +1448,7 b' class SettingsUtility(object):' | |||
|
1447 | 1448 | |
|
1448 | 1449 | def create_rhodecode_ui( |
|
1449 | 1450 | self, section, value, key=None, active=True, cleanup=True): |
|
1450 |
key = key or |
|
|
1451 | key = key or sha1_safe(f'{section}{value}') | |
|
1451 | 1452 | |
|
1452 | 1453 | setting = RhodeCodeUi() |
|
1453 | 1454 | setting.ui_section = section |
@@ -27,7 +27,7 b' def test_vcs_available_returns_summary_p' | |||
|
27 | 27 | url = '/{repo_name}'.format(repo_name=backend.repo.repo_name) |
|
28 | 28 | response = app.get(url) |
|
29 | 29 | assert response.status_code == 200 |
|
30 |
assert 'Summary' in response. |
|
|
30 | assert 'Summary' in response.text | |
|
31 | 31 | |
|
32 | 32 | |
|
33 | 33 | @pytest.mark.usefixtures('autologin_user', 'app') |
@@ -48,4 +48,4 b' def test_vcs_unavailable_returns_vcs_err' | |||
|
48 | 48 | response = app.get(url, expect_errors=True) |
|
49 | 49 | |
|
50 | 50 | assert response.status_code == 502 |
|
51 |
assert 'Could not connect to VCS Server' in response. |
|
|
51 | assert 'Could not connect to VCS Server' in response.text |
@@ -96,4 +96,4 b' def test_remote_app_caller():' | |||
|
96 | 96 | ('Content-Type', 'text/plain'), |
|
97 | 97 | ('Content-Length', '7'), |
|
98 | 98 | ]) |
|
99 |
assert response. |
|
|
99 | assert response.text == 'content' |
@@ -21,113 +21,116 b'' | |||
|
21 | 21 | import pytest |
|
22 | 22 | |
|
23 | 23 | from rhodecode.lib.encrypt import ( |
|
24 |
AESCipher, |
|
|
25 |
from rhodecode.lib |
|
|
24 | AESCipher, InvalidDecryptedValue) | |
|
25 | from rhodecode.lib import enc_utils | |
|
26 | from rhodecode.lib.str_utils import safe_str | |
|
27 | from rhodecode.lib.exceptions import SignatureVerificationError | |
|
28 | ||
|
29 | ||
|
30 | @pytest.mark.parametrize( | |
|
31 | "algo", ['fernet', 'aes'], | |
|
32 | ) | |
|
33 | @pytest.mark.parametrize( | |
|
34 | "key, text", | |
|
35 | [ | |
|
36 | (b'a', 'short'), | |
|
37 | (b'a' * 64, 'too long(trimmed to 32)'), | |
|
38 | (b'a' * 32, 'just enough'), | |
|
39 | ('ąćęćę', 'non asci'), | |
|
40 | ('$asa$asa', 'special $ used'), | |
|
41 | ] | |
|
42 | ) | |
|
43 | @pytest.mark.parametrize( | |
|
44 | "strict_mode", [True, False], | |
|
45 | ) | |
|
46 | def test_common_encryption_module(algo, key, text, strict_mode): | |
|
47 | encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) | |
|
48 | decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=strict_mode) | |
|
49 | assert text == safe_str(decrypted) | |
|
50 | ||
|
51 | ||
|
52 | @pytest.mark.parametrize( | |
|
53 | "algo", ['fernet', 'aes'], | |
|
54 | ) | |
|
55 | def test_encryption_with_bad_key(algo): | |
|
56 | key = b'secretstring' | |
|
57 | text = b'ihatemysql' | |
|
58 | ||
|
59 | encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) | |
|
60 | decrypted = enc_utils.decrypt_value(encrypted, algo=algo, enc_key=b'different-key', strict_mode=False) | |
|
61 | ||
|
62 | assert decrypted[:22] == '<InvalidDecryptedValue' | |
|
63 | ||
|
64 | ||
|
65 | @pytest.mark.parametrize( | |
|
66 | "algo", ['fernet', 'aes'], | |
|
67 | ) | |
|
68 | def test_encryption_with_bad_key_raises(algo): | |
|
69 | key = b'secretstring' | |
|
70 | text = b'ihatemysql' | |
|
71 | encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) | |
|
72 | ||
|
73 | with pytest.raises(SignatureVerificationError) as e: | |
|
74 | enc_utils.decrypt_value(encrypted, algo=algo, enc_key=b'different-key', strict_mode=True) | |
|
75 | ||
|
76 | assert 'InvalidDecryptedValue' in str(e) | |
|
26 | 77 | |
|
27 | 78 | |
|
28 | class TestEncryptModule(object): | |
|
29 | ||
|
30 | @pytest.mark.parametrize( | |
|
31 | "key, text", | |
|
32 | [ | |
|
33 | ('a', 'short'), | |
|
34 | ('a'*64, 'too long(trimmed to 32)'), | |
|
35 | ('a'*32, 'just enough'), | |
|
36 | ('ąćęćę', 'non asci'), | |
|
37 | ('$asa$asa', 'special $ used'), | |
|
38 | ] | |
|
39 | ) | |
|
40 | def test_encryption(self, key, text): | |
|
41 | enc = AESCipher(key).encrypt(text) | |
|
42 | assert AESCipher(key).decrypt(enc) == text | |
|
43 | ||
|
44 | def test_encryption_with_hmac(self): | |
|
45 | key = 'secret' | |
|
46 | text = 'ihatemysql' | |
|
47 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
48 | assert AESCipher(key, hmac=True).decrypt(enc) == text | |
|
79 | @pytest.mark.parametrize( | |
|
80 | "algo", ['fernet', 'aes'], | |
|
81 | ) | |
|
82 | def test_encryption_with_bad_format_data(algo): | |
|
83 | key = b'secret' | |
|
84 | text = b'ihatemysql' | |
|
85 | encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) | |
|
86 | encrypted = b'$xyz' + encrypted[3:] | |
|
49 | 87 | |
|
50 | def test_encryption_with_hmac_with_bad_key(self): | |
|
51 | key = 'secretstring' | |
|
52 | text = 'ihatemysql' | |
|
53 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
54 | ||
|
55 | with pytest.raises(SignatureVerificationError) as e: | |
|
56 | assert AESCipher('differentsecret', hmac=True).decrypt(enc) == '' | |
|
57 | ||
|
58 | assert 'Encryption signature verification failed' in str(e) | |
|
88 | with pytest.raises(ValueError) as e: | |
|
89 | enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=True) | |
|
59 | 90 | |
|
60 | def test_encryption_with_hmac_with_bad_data(self): | |
|
61 | key = 'secret' | |
|
62 | text = 'ihatemysql' | |
|
63 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
64 | enc = 'xyz' + enc[3:] | |
|
65 | with pytest.raises(SignatureVerificationError) as e: | |
|
66 | assert AESCipher(key, hmac=True).decrypt(enc) == text | |
|
67 | ||
|
68 | assert 'Encryption signature verification failed' in str(e) | |
|
69 | ||
|
70 | def test_encryption_with_hmac_with_bad_key_not_strict(self): | |
|
71 | key = 'secretstring' | |
|
72 | text = 'ihatemysql' | |
|
73 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
74 | ||
|
75 | assert isinstance(AESCipher( | |
|
76 | 'differentsecret', hmac=True, strict_verification=False | |
|
77 | ).decrypt(enc), InvalidDecryptedValue) | |
|
91 | assert 'Encrypted Data has invalid format' in str(e) | |
|
78 | 92 | |
|
79 | 93 | |
|
80 | class TestEncryptModule2(object): | |
|
94 | @pytest.mark.parametrize( | |
|
95 | "algo", ['fernet', 'aes'], | |
|
96 | ) | |
|
97 | def test_encryption_with_bad_data(algo): | |
|
98 | key = b'secret' | |
|
99 | text = b'ihatemysql' | |
|
100 | encrypted = enc_utils.encrypt_value(text, algo=algo, enc_key=key) | |
|
101 | encrypted = encrypted[:-5] | |
|
81 | 102 | |
|
82 | @pytest.mark.parametrize( | |
|
83 | "key, text", | |
|
84 | [ | |
|
85 | ('a', 'short'), | |
|
86 | ('a'*64, 'too long(trimmed to 32)'), | |
|
87 | ('a'*32, 'just enough'), | |
|
88 | ('ąćęćę', 'non asci'), | |
|
89 | ('$asa$asa', 'special $ used'), | |
|
90 | ] | |
|
91 | ) | |
|
92 | def test_encryption(self, key, text): | |
|
93 | enc = Encryptor(key).encrypt(text) | |
|
94 | assert Encryptor(key).decrypt(enc) == text | |
|
103 | with pytest.raises(SignatureVerificationError) as e: | |
|
104 | enc_utils.decrypt_value(encrypted, algo=algo, enc_key=key, strict_mode=True) | |
|
105 | ||
|
106 | assert 'SignatureVerificationError' in str(e) | |
|
107 | ||
|
95 | 108 | |
|
96 |
|
|
|
97 |
|
|
|
98 |
|
|
|
99 |
|
|
|
109 | def test_encryption_with_hmac(): | |
|
110 | key = b'secret' | |
|
111 | text = b'ihatemysql' | |
|
112 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
113 | assert AESCipher(key, hmac=True).decrypt(enc) == text | |
|
100 | 114 | |
|
101 | assert Encryptor('differentsecret').decrypt(enc) == '' | |
|
102 | 115 | |
|
103 |
|
|
|
104 |
|
|
|
105 |
|
|
|
106 |
|
|
|
107 | ||
|
108 |
|
|
|
109 |
|
|
|
116 | def test_encryption_with_hmac_with_bad_data(): | |
|
117 | key = b'secret' | |
|
118 | text = b'ihatemysql' | |
|
119 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
120 | enc = b'xyz' + enc[3:] | |
|
121 | with pytest.raises(SignatureVerificationError) as e: | |
|
122 | assert AESCipher(key, hmac=True).decrypt(enc, safe=False) == text | |
|
110 | 123 | |
|
111 |
|
|
|
124 | assert 'SignatureVerificationError' in str(e) | |
|
112 | 125 | |
|
113 | def test_encryption_with_bad_format_data(self): | |
|
114 | key = 'secret' | |
|
115 | text = 'ihatemysql' | |
|
116 | enc = Encryptor(key).encrypt(text) | |
|
117 | enc = '$xyz' + enc[3:] | |
|
118 | 126 | |
|
119 | with pytest.raises(ValueError) as e: | |
|
120 | Encryptor(key).decrypt(enc, safe=False) | |
|
121 | ||
|
122 | assert 'Encrypted Data has invalid format' in str(e) | |
|
127 | def test_encryption_with_hmac_with_bad_key_not_strict(): | |
|
128 | key = b'secretstring' | |
|
129 | text = b'ihatemysql' | |
|
130 | enc = AESCipher(key, hmac=True).encrypt(text) | |
|
123 | 131 | |
|
124 | def test_encryption_with_bad_data(self): | |
|
125 | key = 'secret' | |
|
126 | text = 'ihatemysql' | |
|
127 | enc = Encryptor(key).encrypt(text) | |
|
128 | enc = enc[:-5] | |
|
132 | decrypted = AESCipher( | |
|
133 | b'differentsecret', hmac=True, strict_verification=False | |
|
134 | ).decrypt(enc) | |
|
129 | 135 | |
|
130 | with pytest.raises(InvalidToken) as e: | |
|
131 | Encryptor(key).decrypt(enc, safe=False) | |
|
132 | ||
|
133 | assert 'InvalidToken' in str(e) | |
|
136 | assert isinstance(decrypted, InvalidDecryptedValue) |
@@ -439,7 +439,7 b' def main(argv):' | |||
|
439 | 439 | return 1 |
|
440 | 440 | |
|
441 | 441 | sizes = options.sizes.split(',') |
|
442 | sizes = map(int, sizes) | |
|
442 | sizes = list(map(int, sizes)) | |
|
443 | 443 | |
|
444 | 444 | base_dir = options.dir |
|
445 | 445 | api_key = options.api_key |
@@ -24,7 +24,9 b' import logging' | |||
|
24 | 24 | import os.path |
|
25 | 25 | import subprocess |
|
26 | 26 | import tempfile |
|
27 |
import urllib.request |
|
|
27 | import urllib.request | |
|
28 | import urllib.error | |
|
29 | import urllib.parse | |
|
28 | 30 | from lxml.html import fromstring, tostring |
|
29 | 31 | from lxml.cssselect import CSSSelector |
|
30 | 32 | from urllib.parse import unquote_plus |
@@ -54,7 +56,7 b' log = logging.getLogger(__name__)' | |||
|
54 | 56 | class CustomTestResponse(TestResponse): |
|
55 | 57 | |
|
56 | 58 | def _save_output(self, out): |
|
57 | f = tempfile.NamedTemporaryFile(delete=False, prefix='rc-test-', suffix='.html') | |
|
59 | f = tempfile.NamedTemporaryFile(mode='w', delete=False, prefix='rc-test-', suffix='.html') | |
|
58 | 60 | f.write(out) |
|
59 | 61 | return f.name |
|
60 | 62 | |
@@ -82,7 +84,7 b' class CustomTestResponse(TestResponse):' | |||
|
82 | 84 | f = self._save_output(str(self)) |
|
83 | 85 | |
|
84 | 86 | for s in strings: |
|
85 |
if not |
|
|
87 | if s not in self: | |
|
86 | 88 | print_stderr("Actual response (no %r):" % s) |
|
87 | 89 | print_stderr("body output saved as `%s`" % f) |
|
88 | 90 | if print_body: |
@@ -113,13 +115,24 b' class CustomTestResponse(TestResponse):' | |||
|
113 | 115 | |
|
114 | 116 | class TestRequest(webob.BaseRequest): |
|
115 | 117 | |
|
116 | # for py.test | |
|
118 | # for py.test, so it doesn't try to run this tas by name starting with test... | |
|
117 | 119 | disabled = True |
|
118 | 120 | ResponseClass = CustomTestResponse |
|
119 | 121 | |
|
120 | 122 | def add_response_callback(self, callback): |
|
121 | 123 | pass |
|
122 | 124 | |
|
125 | @classmethod | |
|
126 | def blank(cls, path, environ=None, base_url=None, | |
|
127 | headers=None, POST=None, **kw): | |
|
128 | ||
|
129 | if not path.isascii(): | |
|
130 | # our custom quote path if it contains non-ascii chars | |
|
131 | path = urllib.parse.quote(path) | |
|
132 | ||
|
133 | return super(TestRequest, cls).blank( | |
|
134 | path, environ=environ, base_url=base_url, headers=headers, POST=POST, **kw) | |
|
135 | ||
|
123 | 136 | |
|
124 | 137 | class CustomTestApp(TestApp): |
|
125 | 138 | """ |
@@ -260,6 +273,7 b' class AssertResponse(object):' | |||
|
260 | 273 | def element_equals_to(self, css_selector, expected_content): |
|
261 | 274 | element = self.get_element(css_selector) |
|
262 | 275 | element_text = self._element_to_string(element) |
|
276 | ||
|
263 | 277 | assert expected_content in element_text |
|
264 | 278 | |
|
265 | 279 | def element_contains(self, css_selector, expected_content): |
@@ -306,7 +320,7 b' class AssertResponse(object):' | |||
|
306 | 320 | |
|
307 | 321 | def _element_to_string(self, element): |
|
308 | 322 | fromstring, tostring, CSSSelector = self.get_imports() |
|
309 | return tostring(element) | |
|
323 | return tostring(element, encoding='unicode') | |
|
310 | 324 | |
|
311 | 325 | |
|
312 | 326 | class _Url(object): |
@@ -33,6 +33,7 b' import logging' | |||
|
33 | 33 | import os |
|
34 | 34 | import tempfile |
|
35 | 35 | |
|
36 | from rhodecode.lib.str_utils import safe_str | |
|
36 | 37 | from rhodecode.tests import GIT_REPO, HG_REPO |
|
37 | 38 | |
|
38 | 39 | DEBUG = True |
@@ -68,6 +69,10 b' class Command(object):' | |||
|
68 | 69 | self.process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, |
|
69 | 70 | cwd=self.cwd, env=env) |
|
70 | 71 | stdout, stderr = self.process.communicate() |
|
72 | ||
|
73 | stdout = safe_str(stdout) | |
|
74 | stderr = safe_str(stderr) | |
|
75 | ||
|
71 | 76 | if DEBUG: |
|
72 | 77 | log.debug('STDOUT:%s', stdout) |
|
73 | 78 | log.debug('STDERR:%s', stderr) |
@@ -78,8 +83,9 b' class Command(object):' | |||
|
78 | 83 | |
|
79 | 84 | |
|
80 | 85 | def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs): |
|
81 | git_ident = "git config user.name {} && git config user.email {}".format( | |
|
82 | 'Marcin Kuźminski', 'me@email.com') | |
|
86 | full_name = 'Marcin Kuźminski' | |
|
87 | email = 'me@email.com' | |
|
88 | git_ident = f"git config user.name {full_name} && git config user.email {email}" | |
|
83 | 89 | cwd = path = jn(dest) |
|
84 | 90 | |
|
85 | 91 | tags = tags or [] |
General Comments 0
You need to be logged in to leave comments.
Login now