##// END OF EJS Templates
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
super-admin -
r4995:676da06b default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,48 b''
1 from rhodecode.lib.str_utils import safe_bytes
2 from rhodecode.lib.encrypt import encrypt_data, validate_and_decrypt_data
3 from rhodecode.lib.encrypt2 import Encryptor
4
5 ALLOWED_ALGOS = ['aes', 'fernet']
6
7
8 def get_default_algo():
9 import rhodecode
10 return rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes'
11
12
13 def encrypt_value(value: bytes, enc_key: bytes, algo: str = ''):
14 if not algo:
15 # not explicit algo, just use what's set by config
16 algo = get_default_algo()
17 if algo not in ALLOWED_ALGOS:
18 ValueError(f'Bad encryption algorithm, should be {ALLOWED_ALGOS}, got: {algo}')
19
20 enc_key = safe_bytes(enc_key)
21 value = safe_bytes(value)
22
23 if algo == 'aes':
24 return encrypt_data(value, enc_key=enc_key)
25 if algo == 'fernet':
26 return Encryptor(enc_key).encrypt(value)
27
28 return value
29
30
31 def decrypt_value(value: bytes, enc_key: bytes, algo: str = '', strict_mode: bool = False):
32
33 if not algo:
34 # not explicit algo, just use what's set by config
35 algo = get_default_algo()
36 if algo not in ALLOWED_ALGOS:
37 ValueError(f'Bad encryption algorithm, should be {ALLOWED_ALGOS}, got: {algo}')
38
39 enc_key = safe_bytes(enc_key)
40 value = safe_bytes(value)
41 safe = not strict_mode
42
43 if algo == 'aes':
44 return validate_and_decrypt_data(value, enc_key, safe=safe)
45 if algo == 'fernet':
46 return Encryptor(enc_key).decrypt(value, safe=safe)
47
48 return value
@@ -1,138 +1,152 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Generic encryption library for RhodeCode
24 24 """
25 25
26 26 import base64
27 import logging
27 28
28 29 from Crypto.Cipher import AES
29 30 from Crypto import Random
30 31 from Crypto.Hash import HMAC, SHA256
31 32
32 from rhodecode.lib.str_utils import safe_bytes
33
34
35 class SignatureVerificationError(Exception):
36 pass
33 from rhodecode.lib.str_utils import safe_bytes, safe_str
34 from rhodecode.lib.exceptions import signature_verification_error
37 35
38 36
39 37 class InvalidDecryptedValue(str):
40 38
41 39 def __new__(cls, content):
42 40 """
43 41 This will generate something like this::
44 42 <InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
45 43 And represent a safe indicator that encryption key is broken
46 44 """
47 45 content = '<{}({}...)>'.format(cls.__name__, content[:16])
48 46 return str.__new__(cls, content)
49 47
48 KEY_FORMAT = b'enc$aes_hmac${1}'
49
50 50
51 51 class AESCipher(object):
52 def __init__(self, key, hmac=False, strict_verification=True):
52
53 def __init__(self, key: bytes, hmac=False, strict_verification=True):
54
53 55 if not key:
54 56 raise ValueError('passed key variable is empty')
55 57 self.strict_verification = strict_verification
56 58 self.block_size = 32
57 59 self.hmac_size = 32
58 60 self.hmac = hmac
59 61
60 62 self.key = SHA256.new(safe_bytes(key)).digest()
61 63 self.hmac_key = SHA256.new(self.key).digest()
62 64
63 65 def verify_hmac_signature(self, raw_data):
64 66 org_hmac_signature = raw_data[-self.hmac_size:]
65 67 data_without_sig = raw_data[:-self.hmac_size]
66 68 recomputed_hmac = HMAC.new(
67 69 self.hmac_key, data_without_sig, digestmod=SHA256).digest()
68 70 return org_hmac_signature == recomputed_hmac
69 71
70 def encrypt(self, raw):
72 def encrypt(self, raw: bytes):
71 73 raw = self._pad(raw)
72 74 iv = Random.new().read(AES.block_size)
73 75 cipher = AES.new(self.key, AES.MODE_CBC, iv)
74 76 enc_value = cipher.encrypt(raw)
75 77
76 hmac_signature = ''
78 hmac_signature = b''
77 79 if self.hmac:
78 80 # compute hmac+sha256 on iv + enc text, we use
79 81 # encrypt then mac method to create the signature
80 82 hmac_signature = HMAC.new(
81 83 self.hmac_key, iv + enc_value, digestmod=SHA256).digest()
82 84
83 85 return base64.b64encode(iv + enc_value + hmac_signature)
84 86
85 def decrypt(self, enc):
87 def decrypt(self, enc, safe=True) -> bytes | InvalidDecryptedValue:
86 88 enc_org = enc
87 enc = base64.b64decode(enc)
89 try:
90 enc = base64.b64decode(enc)
91 except Exception:
92 logging.exception('Failed Base64 decode')
93 raise signature_verification_error('Failed Base64 decode')
88 94
89 95 if self.hmac and len(enc) > self.hmac_size:
90 96 if self.verify_hmac_signature(enc):
91 97 # cut off the HMAC verification digest
92 98 enc = enc[:-self.hmac_size]
93 99 else:
94 if self.strict_verification:
95 raise SignatureVerificationError(
96 "Encryption signature verification failed. "
97 "Please check your secret key, and/or encrypted value. "
98 "Secret key is stored as "
99 "`rhodecode.encrypted_values.secret` or "
100 "`beaker.session.secret` inside .ini file")
101 100
102 return InvalidDecryptedValue(enc_org)
101 decrypt_fail = InvalidDecryptedValue(safe_str(enc_org))
102 if safe:
103 return decrypt_fail
104 raise signature_verification_error(decrypt_fail)
103 105
104 106 iv = enc[:AES.block_size]
105 107 cipher = AES.new(self.key, AES.MODE_CBC, iv)
106 108 return self._unpad(cipher.decrypt(enc[AES.block_size:]))
107 109
108 110 def _pad(self, s):
109 return (s + (self.block_size - len(s) % self.block_size)
110 * chr(self.block_size - len(s) % self.block_size))
111 block_pad = (self.block_size - len(s) % self.block_size)
112 return s + block_pad * safe_bytes(chr(block_pad))
111 113
112 114 @staticmethod
113 115 def _unpad(s):
114 116 return s[:-ord(s[len(s)-1:])]
115 117
116 118
117 def validate_and_get_enc_data(enc_data, enc_key, enc_strict_mode):
119 def validate_and_decrypt_data(enc_data, enc_key, enc_strict_mode=False, safe=True):
120 enc_data = safe_str(enc_data)
121
118 122 parts = enc_data.split('$', 3)
119 if not len(parts) == 3:
120 # probably not encrypted values
123 if len(parts) != 3:
124 raise ValueError(f'Encrypted Data has invalid format, expected {KEY_FORMAT}, got {parts}')
125
126 enc_type = parts[1]
127 enc_data_part = parts[2]
128
129 if parts[0] != 'enc':
130 # parts ok but without our header ?
121 131 return enc_data
122 else:
123 if parts[0] != 'enc':
124 # parts ok but without our header ?
125 return enc_data
126 132
127 # at that stage we know it's our encryption
128 if parts[1] == 'aes':
129 decrypted_data = AESCipher(enc_key).decrypt(parts[2])
130 elif parts[1] == 'aes_hmac':
131 decrypted_data = AESCipher(
132 enc_key, hmac=True,
133 strict_verification=enc_strict_mode).decrypt(parts[2])
134 else:
135 raise ValueError(
136 'Encryption type part is wrong, must be `aes` '
137 'or `aes_hmac`, got `%s` instead' % (parts[1]))
138 return decrypted_data
133 # at that stage we know it's our encryption
134 if enc_type == 'aes':
135 decrypted_data = AESCipher(enc_key).decrypt(enc_data_part, safe=safe)
136 elif enc_type == 'aes_hmac':
137 decrypted_data = AESCipher(
138 enc_key, hmac=True,
139 strict_verification=enc_strict_mode).decrypt(enc_data_part, safe=safe)
140
141 else:
142 raise ValueError(
143 f'Encryption type part is wrong, must be `aes` '
144 f'or `aes_hmac`, got `{enc_type}` instead')
145
146 return decrypted_data
147
148
149 def encrypt_data(data, enc_key: bytes):
150 enc_key = safe_bytes(enc_key)
151 enc_value = AESCipher(enc_key, hmac=True).encrypt(safe_bytes(data))
152 return KEY_FORMAT.replace(b'{1}', enc_value)
@@ -1,69 +1,84 b''
1 1 import os
2 2 import base64
3 3 from cryptography.fernet import Fernet, InvalidToken
4 4 from cryptography.hazmat.backends import default_backend
5 5 from cryptography.hazmat.primitives import hashes
6 6 from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
7 7
8 from rhodecode.lib.str_utils import safe_str
9 from rhodecode.lib.exceptions import signature_verification_error
10
11
12 class InvalidDecryptedValue(str):
13
14 def __new__(cls, content):
15 """
16 This will generate something like this::
17 <InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
18 And represent a safe indicator that encryption key is broken
19 """
20 content = '<{}({}...)>'.format(cls.__name__, content[:16])
21 return str.__new__(cls, content)
22
8 23
9 24 class Encryptor(object):
10 key_format = 'enc2$salt:{}$data:{}'
25 key_format = b'enc2$salt:{1}$data:{2}'
11 26 pref_len = 5 # salt:, data:
12 27
13 def __init__(self, enc_key):
28 def __init__(self, enc_key: bytes):
14 29 self.enc_key = enc_key
15 30
16 31 def b64_encode(self, data):
17 32 return base64.urlsafe_b64encode(data)
18 33
19 34 def b64_decode(self, data):
20 35 return base64.urlsafe_b64decode(data)
21 36
22 37 def get_encryptor(self, salt):
23 38 """
24 39 Uses Fernet as encryptor with HMAC signature
25 40 :param salt: random salt used for encrypting the data
26 41 """
27 42 kdf = PBKDF2HMAC(
28 43 algorithm=hashes.SHA512(),
29 44 length=32,
30 45 salt=salt,
31 46 iterations=100000,
32 47 backend=default_backend()
33 48 )
34 49 key = self.b64_encode(kdf.derive(self.enc_key))
35 50 return Fernet(key)
36 51
37 52 def _get_parts(self, enc_data):
38 parts = enc_data.split('$', 3)
53 parts = enc_data.split(b'$', 3)
39 54 if len(parts) != 3:
40 raise ValueError('Encrypted Data has invalid format, expected {}'.format(self.key_format))
55 raise ValueError(f'Encrypted Data has invalid format, expected {self.key_format}, got {parts}')
41 56 prefix, salt, enc_data = parts
42 57
43 58 try:
44 59 salt = self.b64_decode(salt[self.pref_len:])
45 60 except TypeError:
46 61 # bad base64
47 62 raise ValueError('Encrypted Data salt invalid format, expected base64 format')
48 63
49 64 enc_data = enc_data[self.pref_len:]
50 65 return prefix, salt, enc_data
51 66
52 def encrypt(self, data):
67 def encrypt(self, data) -> bytes:
53 68 salt = os.urandom(64)
54 69 encryptor = self.get_encryptor(salt)
55 70 enc_data = encryptor.encrypt(data)
56 return self.key_format.format(self.b64_encode(salt), enc_data)
71 return self.key_format.replace(b'{1}', self.b64_encode(salt)).replace(b'{2}', enc_data)
57 72
58 def decrypt(self, data, safe=True):
73 def decrypt(self, data, safe=True) -> bytes | InvalidDecryptedValue:
59 74 parts = self._get_parts(data)
60 75 salt = parts[1]
61 76 enc_data = parts[2]
62 77 encryptor = self.get_encryptor(salt)
63 78 try:
64 79 return encryptor.decrypt(enc_data)
65 80 except (InvalidToken,):
81 decrypt_fail = InvalidDecryptedValue(safe_str(data))
66 82 if safe:
67 return ''
68 else:
69 raise
83 return decrypt_fail
84 raise signature_verification_error(decrypt_fail)
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now