##// END OF EJS Templates
encryption: added new backend using cryptography + Fernet encryption....
marcink -
r3522:3910c057 default
parent child Browse files
Show More
@@ -0,0 +1,69 b''
1 import os
2 import base64
3 from cryptography.fernet import Fernet, InvalidToken
4 from cryptography.hazmat.backends import default_backend
5 from cryptography.hazmat.primitives import hashes
6 from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
7
8
9 class Encryptor(object):
10 key_format = 'enc2$salt:{}$data:{}'
11 pref_len = 5 # salt:, data:
12
13 def __init__(self, enc_key):
14 self.enc_key = enc_key
15
16 def b64_encode(self, data):
17 return base64.urlsafe_b64encode(data)
18
19 def b64_decode(self, data):
20 return base64.urlsafe_b64decode(data)
21
22 def get_encryptor(self, salt):
23 """
24 Uses Fernet as encryptor with HMAC signature
25 :param salt: random salt used for encrypting the data
26 """
27 kdf = PBKDF2HMAC(
28 algorithm=hashes.SHA512(),
29 length=32,
30 salt=salt,
31 iterations=100000,
32 backend=default_backend()
33 )
34 key = self.b64_encode(kdf.derive(self.enc_key))
35 return Fernet(key)
36
37 def _get_parts(self, enc_data):
38 parts = enc_data.split('$', 3)
39 if len(parts) != 3:
40 raise ValueError('Encrypted Data has invalid format, expected {}'.format(self.key_format))
41 prefix, salt, enc_data = parts
42
43 try:
44 salt = self.b64_decode(salt[self.pref_len:])
45 except TypeError:
46 # bad base64
47 raise ValueError('Encrypted Data salt invalid format, expected base64 format')
48
49 enc_data = enc_data[self.pref_len:]
50 return prefix, salt, enc_data
51
52 def encrypt(self, data):
53 salt = os.urandom(64)
54 encryptor = self.get_encryptor(salt)
55 enc_data = encryptor.encrypt(data)
56 return self.key_format.format(self.b64_encode(salt), enc_data)
57
58 def decrypt(self, data, safe=True):
59 parts = self._get_parts(data)
60 salt = parts[1]
61 enc_data = parts[2]
62 encryptor = self.get_encryptor(salt)
63 try:
64 return encryptor.decrypt(enc_data)
65 except (InvalidToken,):
66 if safe:
67 return ''
68 else:
69 raise
@@ -133,6 +133,11 b' rhodecode.api.url = /_admin/api'
133 133 ## `SignatureVerificationError` in case of wrong key, or damaged encryption data.
134 134 #rhodecode.encrypted_values.strict = false
135 135
136 ## Pick algorithm for encryption. Either fernet (more secure) or aes (default)
137 ## fernet is safer, and we strongly recommend switching to it.
138 ## Due to backward compatibility aes is used as default.
139 #rhodecode.encrypted_values.algorithm = fernet
140
136 141 ## return gzipped responses from RhodeCode (static files/application)
137 142 gzip_responses = false
138 143
@@ -108,6 +108,11 b' use = egg:rhodecode-enterprise-ce'
108 108 ## `SignatureVerificationError` in case of wrong key, or damaged encryption data.
109 109 #rhodecode.encrypted_values.strict = false
110 110
111 ## Pick algorithm for encryption. Either fernet (more secure) or aes (default)
112 ## fernet is safer, and we strongly recommend switching to it.
113 ## Due to backward compatibility aes is used as default.
114 #rhodecode.encrypted_values.algorithm = fernet
115
111 116 ## return gzipped responses from RhodeCode (static files/application)
112 117 gzip_responses = false
113 118
@@ -111,4 +111,28 b' class AESCipher(object):'
111 111
112 112 @staticmethod
113 113 def _unpad(s):
114 return s[:-ord(s[len(s)-1:])] No newline at end of file
114 return s[:-ord(s[len(s)-1:])]
115
116
117 def validate_and_get_enc_data(enc_data, enc_key, enc_strict_mode):
118 parts = enc_data.split('$', 3)
119 if not len(parts) == 3:
120 # probably not encrypted values
121 return enc_data
122 else:
123 if parts[0] != 'enc':
124 # parts ok but without our header ?
125 return enc_data
126
127 # at that stage we know it's our encryption
128 if parts[1] == 'aes':
129 decrypted_data = AESCipher(enc_key).decrypt(parts[2])
130 elif parts[1] == 'aes_hmac':
131 decrypted_data = AESCipher(
132 enc_key, hmac=True,
133 strict_verification=enc_strict_mode).decrypt(parts[2])
134 else:
135 raise ValueError(
136 'Encryption type part is wrong, must be `aes` '
137 'or `aes_hmac`, got `%s` instead' % (parts[1]))
138 return decrypted_data
@@ -62,8 +62,8 b' from rhodecode.lib.jsonalchemy import Mu'
62 62 JsonRaw
63 63 from rhodecode.lib.ext_json import json
64 64 from rhodecode.lib.caching_query import FromCache
65 from rhodecode.lib.encrypt import AESCipher
66
65 from rhodecode.lib.encrypt import AESCipher, validate_and_get_enc_data
66 from rhodecode.lib.encrypt2 import Encryptor
67 67 from rhodecode.model.meta import Base, Session
68 68
69 69 URL_SEP = '/'
@@ -159,43 +159,45 b' class EncryptedTextValue(TypeDecorator):'
159 159 impl = Text
160 160
161 161 def process_bind_param(self, value, dialect):
162 if not value:
163 return value
164 if value.startswith('enc$aes$') or value.startswith('enc$aes_hmac$'):
165 # protect against double encrypting if someone manually starts
166 # doing
167 raise ValueError('value needs to be in unencrypted format, ie. '
168 'not starting with enc$aes')
169 return 'enc$aes_hmac$%s' % AESCipher(
170 ENCRYPTION_KEY, hmac=True).encrypt(value)
171
172 def process_result_value(self, value, dialect):
162 """
163 Setter for storing value
164 """
173 165 import rhodecode
174
175 166 if not value:
176 167 return value
177 168
178 parts = value.split('$', 3)
179 if not len(parts) == 3:
180 # probably not encrypted values
181 return value
169 # protect against double encrypting if values is already encrypted
170 if value.startswith('enc$aes$') \
171 or value.startswith('enc$aes_hmac$') \
172 or value.startswith('enc2$'):
173 raise ValueError('value needs to be in unencrypted format, '
174 'ie. not starting with enc$ or enc2$')
175
176 algo = rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes'
177 if algo == 'aes':
178 return 'enc$aes_hmac$%s' % AESCipher(ENCRYPTION_KEY, hmac=True).encrypt(value)
179 elif algo == 'fernet':
180 return Encryptor(ENCRYPTION_KEY).encrypt(value)
182 181 else:
183 if parts[0] != 'enc':
184 # parts ok but without our header ?
182 ValueError('Bad encryption algorithm, should be fernet or aes, got: {}'.format(algo))
183
184 def process_result_value(self, value, dialect):
185 """
186 Getter for retrieving value
187 """
188
189 import rhodecode
190 if not value:
185 191 return value
186 enc_strict_mode = str2bool(rhodecode.CONFIG.get(
187 'rhodecode.encrypted_values.strict') or True)
188 # at that stage we know it's our encryption
189 if parts[1] == 'aes':
190 decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2])
191 elif parts[1] == 'aes_hmac':
192 decrypted_data = AESCipher(
193 ENCRYPTION_KEY, hmac=True,
194 strict_verification=enc_strict_mode).decrypt(parts[2])
192
193 algo = rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes'
194 enc_strict_mode = str2bool(rhodecode.CONFIG.get('rhodecode.encrypted_values.strict') or True)
195 if algo == 'aes':
196 decrypted_data = validate_and_get_enc_data(value, ENCRYPTION_KEY, enc_strict_mode)
197 elif algo == 'fernet':
198 return Encryptor(ENCRYPTION_KEY).decrypt(value)
195 199 else:
196 raise ValueError(
197 'Encryption type part is wrong, must be `aes` '
198 'or `aes_hmac`, got `%s` instead' % (parts[1]))
200 ValueError('Bad encryption algorithm, should be fernet or aes, got: {}'.format(algo))
199 201 return decrypted_data
200 202
201 203
@@ -22,6 +22,7 b' import pytest'
22 22
23 23 from rhodecode.lib.encrypt import (
24 24 AESCipher, SignatureVerificationError, InvalidDecryptedValue)
25 from rhodecode.lib.encrypt2 import (Encryptor, InvalidToken)
25 26
26 27
27 28 class TestEncryptModule(object):
@@ -74,3 +75,59 b' class TestEncryptModule(object):'
74 75 assert isinstance(AESCipher(
75 76 'differentsecret', hmac=True, strict_verification=False
76 77 ).decrypt(enc), InvalidDecryptedValue)
78
79
80 class TestEncryptModule2(object):
81
82 @pytest.mark.parametrize(
83 "key, text",
84 [
85 ('a', 'short'),
86 ('a'*64, 'too long(trimmed to 32)'),
87 ('a'*32, 'just enough'),
88 ('Δ…Δ‡Δ™Δ‡Δ™', 'non asci'),
89 ('$asa$asa', 'special $ used'),
90 ]
91 )
92 def test_encryption(self, key, text):
93 enc = Encryptor(key).encrypt(text)
94 assert Encryptor(key).decrypt(enc) == text
95
96 def test_encryption_with_bad_key(self):
97 key = 'secretstring'
98 text = 'ihatemysql'
99 enc = Encryptor(key).encrypt(text)
100
101 assert Encryptor('differentsecret').decrypt(enc) == ''
102
103 def test_encryption_with_bad_key_raises(self):
104 key = 'secretstring'
105 text = 'ihatemysql'
106 enc = Encryptor(key).encrypt(text)
107
108 with pytest.raises(InvalidToken) as e:
109 Encryptor('differentsecret').decrypt(enc, safe=False)
110
111 assert 'InvalidToken' in str(e)
112
113 def test_encryption_with_bad_format_data(self):
114 key = 'secret'
115 text = 'ihatemysql'
116 enc = Encryptor(key).encrypt(text)
117 enc = '$xyz' + enc[3:]
118
119 with pytest.raises(ValueError) as e:
120 Encryptor(key).decrypt(enc, safe=False)
121
122 assert 'Encrypted Data has invalid format' in str(e)
123
124 def test_encryption_with_bad_data(self):
125 key = 'secret'
126 text = 'ihatemysql'
127 enc = Encryptor(key).encrypt(text)
128 enc = enc[:-5]
129
130 with pytest.raises(InvalidToken) as e:
131 Encryptor(key).decrypt(enc, safe=False)
132
133 assert 'InvalidToken' in str(e)
General Comments 0
You need to be logged in to leave comments. Login now