Show More
@@ -1,49 +1,49 b'' | |||
|
1 | 1 | from rhodecode.lib.str_utils import safe_bytes |
|
2 | 2 | from rhodecode.lib.encrypt import encrypt_data, validate_and_decrypt_data |
|
3 | 3 | from rhodecode.lib.encrypt2 import Encryptor |
|
4 | 4 | |
|
5 | 5 | ALLOWED_ALGOS = ['aes', 'fernet'] |
|
6 | 6 | |
|
7 | 7 | |
|
8 | 8 | def get_default_algo(): |
|
9 | 9 | import rhodecode |
|
10 | 10 | return rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes' |
|
11 | 11 | |
|
12 | 12 | |
|
13 | 13 | def encrypt_value(value: bytes, enc_key: bytes, algo: str = ''): |
|
14 | 14 | if not algo: |
|
15 | 15 | # not explicit algo, just use what's set by config |
|
16 | 16 | algo = get_default_algo() |
|
17 | 17 | |
|
18 | 18 | if algo not in ALLOWED_ALGOS: |
|
19 | 19 | ValueError(f'Bad encryption algorithm, should be {ALLOWED_ALGOS}, got: {algo}') |
|
20 | 20 | |
|
21 | 21 | enc_key = safe_bytes(enc_key) |
|
22 | 22 | value = safe_bytes(value) |
|
23 | 23 | |
|
24 | 24 | if algo == 'aes': |
|
25 | 25 | return encrypt_data(value, enc_key=enc_key) |
|
26 | 26 | if algo == 'fernet': |
|
27 | 27 | return Encryptor(enc_key).encrypt(value) |
|
28 | 28 | |
|
29 | 29 | return value |
|
30 | 30 | |
|
31 | 31 | |
|
32 | 32 | def decrypt_value(value: bytes, enc_key: bytes, algo: str = '', strict_mode: bool = False): |
|
33 | enc_key = safe_bytes(enc_key) | |
|
34 | value = safe_bytes(value) | |
|
33 | 35 | |
|
34 | 36 | if not algo: |
|
35 | 37 | # not explicit algo, just use what's set by config |
|
36 | algo = get_default_algo() | |
|
38 | algo = Encryptor.detect_enc_algo(value) or get_default_algo() | |
|
37 | 39 | if algo not in ALLOWED_ALGOS: |
|
38 | 40 | ValueError(f'Bad encryption algorithm, should be {ALLOWED_ALGOS}, got: {algo}') |
|
39 | 41 | |
|
40 | enc_key = safe_bytes(enc_key) | |
|
41 | value = safe_bytes(value) | |
|
42 | 42 | safe = not strict_mode |
|
43 | 43 | |
|
44 | 44 | if algo == 'aes': |
|
45 | 45 | return validate_and_decrypt_data(value, enc_key, safe=safe) |
|
46 | 46 | if algo == 'fernet': |
|
47 | 47 | return Encryptor(enc_key).decrypt(value, safe=safe) |
|
48 | 48 | |
|
49 | 49 | return value |
@@ -1,84 +1,97 b'' | |||
|
1 | 1 | import os |
|
2 | 2 | import base64 |
|
3 | 3 | from cryptography.fernet import Fernet, InvalidToken |
|
4 | 4 | from cryptography.hazmat.backends import default_backend |
|
5 | 5 | from cryptography.hazmat.primitives import hashes |
|
6 | 6 | from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC |
|
7 | 7 | |
|
8 | 8 | from rhodecode.lib.str_utils import safe_str |
|
9 | 9 | from rhodecode.lib.exceptions import signature_verification_error |
|
10 | 10 | |
|
11 | 11 | |
|
12 | 12 | class InvalidDecryptedValue(str): |
|
13 | 13 | |
|
14 | 14 | def __new__(cls, content): |
|
15 | 15 | """ |
|
16 | 16 | This will generate something like this:: |
|
17 | 17 | <InvalidDecryptedValue(QkWusFgLJXR6m42v...)> |
|
18 | 18 | And represent a safe indicator that encryption key is broken |
|
19 | 19 | """ |
|
20 | 20 | content = f'<{cls.__name__}({content[:16]}...)>' |
|
21 | 21 | return str.__new__(cls, content) |
|
22 | 22 | |
|
23 | 23 | |
|
24 | 24 | class Encryptor(object): |
|
25 | 25 | key_format = b'enc2$salt:{1}$data:{2}' |
|
26 | ||
|
26 | 27 | pref_len = 5 # salt:, data: |
|
27 | 28 | |
|
29 | @classmethod | |
|
30 | def detect_enc_algo(cls, enc_data: bytes): | |
|
31 | parts = enc_data.split(b'$', 3) | |
|
32 | if len(parts) != 3: | |
|
33 | raise ValueError(f'Encrypted Data has invalid format, expected {cls.key_format}, got {parts}') | |
|
34 | ||
|
35 | if b'enc$aes_hmac$' in enc_data: | |
|
36 | return 'aes' | |
|
37 | elif b'enc2$salt' in enc_data: | |
|
38 | return 'fernet' | |
|
39 | return None | |
|
40 | ||
|
28 | 41 | def __init__(self, enc_key: bytes): |
|
29 | 42 | self.enc_key = enc_key |
|
30 | 43 | |
|
31 | 44 | def b64_encode(self, data): |
|
32 | 45 | return base64.urlsafe_b64encode(data) |
|
33 | 46 | |
|
34 | 47 | def b64_decode(self, data): |
|
35 | 48 | return base64.urlsafe_b64decode(data) |
|
36 | 49 | |
|
37 | 50 | def get_encryptor(self, salt): |
|
38 | 51 | """ |
|
39 | 52 | Uses Fernet as encryptor with HMAC signature |
|
40 | 53 | :param salt: random salt used for encrypting the data |
|
41 | 54 | """ |
|
42 | 55 | kdf = PBKDF2HMAC( |
|
43 | 56 | algorithm=hashes.SHA512(), |
|
44 | 57 | length=32, |
|
45 | 58 | salt=salt, |
|
46 | 59 | iterations=100000, |
|
47 | 60 | backend=default_backend() |
|
48 | 61 | ) |
|
49 | 62 | key = self.b64_encode(kdf.derive(self.enc_key)) |
|
50 | 63 | return Fernet(key) |
|
51 | 64 | |
|
52 | 65 | def _get_parts(self, enc_data): |
|
53 | 66 | parts = enc_data.split(b'$', 3) |
|
54 | 67 | if len(parts) != 3: |
|
55 | 68 | raise ValueError(f'Encrypted Data has invalid format, expected {self.key_format}, got {parts}') |
|
56 | 69 | prefix, salt, enc_data = parts |
|
57 | 70 | |
|
58 | 71 | try: |
|
59 | 72 | salt = self.b64_decode(salt[self.pref_len:]) |
|
60 | 73 | except TypeError: |
|
61 | 74 | # bad base64 |
|
62 | 75 | raise ValueError('Encrypted Data salt invalid format, expected base64 format') |
|
63 | 76 | |
|
64 | 77 | enc_data = enc_data[self.pref_len:] |
|
65 | 78 | return prefix, salt, enc_data |
|
66 | 79 | |
|
67 | 80 | def encrypt(self, data) -> bytes: |
|
68 | 81 | salt = os.urandom(64) |
|
69 | 82 | encryptor = self.get_encryptor(salt) |
|
70 | 83 | enc_data = encryptor.encrypt(data) |
|
71 | 84 | return self.key_format.replace(b'{1}', self.b64_encode(salt)).replace(b'{2}', enc_data) |
|
72 | 85 | |
|
73 | 86 | def decrypt(self, data, safe=True) -> bytes | InvalidDecryptedValue: |
|
74 | 87 | parts = self._get_parts(data) |
|
75 | 88 | salt = parts[1] |
|
76 | 89 | enc_data = parts[2] |
|
77 | 90 | encryptor = self.get_encryptor(salt) |
|
78 | 91 | try: |
|
79 | 92 | return encryptor.decrypt(enc_data) |
|
80 | 93 | except (InvalidToken,): |
|
81 | 94 | decrypt_fail = InvalidDecryptedValue(safe_str(data)) |
|
82 | 95 | if safe: |
|
83 | 96 | return decrypt_fail |
|
84 | 97 | raise signature_verification_error(decrypt_fail) |
General Comments 0
You need to be logged in to leave comments.
Login now