encryption: Implement a slightly improved AesCipher encryption....
marcink -
r281:f41dae1c default
Not Reviewed
Show More
Add another comment
TODOs: 0 unresolved 0 Resolved
COMMENTS: 0 General 0 Inline
@@ -119,6 +119,10
119 ## `beaker.session.secret`
119 ## `beaker.session.secret`
120 #rhodecode.encrypted_values.secret =
120 #rhodecode.encrypted_values.secret =
121
121
122 ## decryption strict mode (enabled by default). It controls if decryption raises
123 ## `SignatureVerificationError` in case of wrong key, or damaged encryption data.
124 #rhodecode.encrypted_values.strict = false
125
122 full_stack = true
126 full_stack = true
123
127
124 ## Serve static files via RhodeCode, disable to serve them via HTTP server
128 ## Serve static files via RhodeCode, disable to serve them via HTTP server
@@ -93,6 +93,10
93 ## `beaker.session.secret`
93 ## `beaker.session.secret`
94 #rhodecode.encrypted_values.secret =
94 #rhodecode.encrypted_values.secret =
95
95
96 ## decryption strict mode (enabled by default). It controls if decryption raises
97 ## `SignatureVerificationError` in case of wrong key, or damaged encryption data.
98 #rhodecode.encrypted_values.strict = false
99
96 full_stack = true
100 full_stack = true
97
101
98 ## Serve static files via RhodeCode, disable to serve them via HTTP server
102 ## Serve static files via RhodeCode, disable to serve them via HTTP server
@@ -23,31 +23,84
23 Generic encryption library for RhodeCode
23 Generic encryption library for RhodeCode
24 """
24 """
25
25
26 import hashlib
27 import base64
26 import base64
28
27
29 from Crypto.Cipher import AES
28 from Crypto.Cipher import AES
30 from Crypto import Random
29 from Crypto import Random
30 from Crypto.Hash import HMAC, SHA256
31
31
32 from rhodecode.lib.utils2 import safe_str
32 from rhodecode.lib.utils2 import safe_str
33
33
34
34
35 class SignatureVerificationError(Exception):
36 pass
37
38
39 class InvalidDecryptedValue(str):
40
41 def __new__(cls, content):
42 """
43 This will generate something like this::
44 <InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
45 And represent a safe indicator that encryption key is broken
46 """
47 content = '<{}({}...)>'.format(cls.__name__, content[:16])
48 return str.__new__(cls, content)
49
50
35 class AESCipher(object):
51 class AESCipher(object):
36 def __init__(self, key):
52 def __init__(self, key, hmac=False, strict_verification=True):
37 # create padding, trim to long enc key
38 if not key:
53 if not key:
39 raise ValueError('passed key variable is empty')
54 raise ValueError('passed key variable is empty')
55 self.strict_verification = strict_verification
40 self.block_size = 32
56 self.block_size = 32
41 self.key = hashlib.sha256(safe_str(key)).digest()
57 self.hmac_size = 32
58 self.hmac = hmac
59
60 self.key = SHA256.new(safe_str(key)).digest()
61 self.hmac_key = SHA256.new(self.key).digest()
62
63 def verify_hmac_signature(self, raw_data):
64 org_hmac_signature = raw_data[-self.hmac_size:]
65 data_without_sig = raw_data[:-self.hmac_size]
66 recomputed_hmac = HMAC.new(
67 self.hmac_key, data_without_sig, digestmod=SHA256).digest()
68 return org_hmac_signature == recomputed_hmac
42
69
43 def encrypt(self, raw):
70 def encrypt(self, raw):
44 raw = self._pad(raw)
71 raw = self._pad(raw)
45 iv = Random.new().read(AES.block_size)
72 iv = Random.new().read(AES.block_size)
46 cipher = AES.new(self.key, AES.MODE_CBC, iv)
73 cipher = AES.new(self.key, AES.MODE_CBC, iv)
47 return base64.b64encode(iv + cipher.encrypt(raw))
74 enc_value = cipher.encrypt(raw)
75
76 hmac_signature = ''
77 if self.hmac:
78 # compute hmac+sha256 on iv + enc text, we use
79 # encrypt then mac method to create the signature
80 hmac_signature = HMAC.new(
81 self.hmac_key, iv + enc_value, digestmod=SHA256).digest()
82
83 return base64.b64encode(iv + enc_value + hmac_signature)
48
84
49 def decrypt(self, enc):
85 def decrypt(self, enc):
86 enc_org = enc
50 enc = base64.b64decode(enc)
87 enc = base64.b64decode(enc)
88
89 if self.hmac and len(enc) > self.hmac_size:
90 if self.verify_hmac_signature(enc):
91 # cut off the HMAC verification digest
92 enc = enc[:-self.hmac_size]
93 else:
94 if self.strict_verification:
95 raise SignatureVerificationError(
96 "Encryption signature verification failed. "
97 "Please check your secret key, and/or encrypted value. "
98 "Secret key is stored as "
99 "`rhodecode.encrypted_values.secret` or "
100 "`beaker.session.secret` inside .ini file")
101
102 return InvalidDecryptedValue(enc_org)
103
51 iv = enc[:AES.block_size]
104 iv = enc[:AES.block_size]
52 cipher = AES.new(self.key, AES.MODE_CBC, iv)
105 cipher = AES.new(self.key, AES.MODE_CBC, iv)
53 return self._unpad(cipher.decrypt(enc[AES.block_size:]))
106 return self._unpad(cipher.decrypt(enc[AES.block_size:]))
@@ -70,7 +70,8
70 # BASE CLASSES
70 # BASE CLASSES
71 # =============================================================================
71 # =============================================================================
72
72
73 # this is propagated from .ini file beaker.session.secret
73 # this is propagated from .ini file rhodecode.encrypted_values.secret or
74 # beaker.session.secret if first is not set.
74 # and initialized at environment.py
75 # and initialized at environment.py
75 ENCRYPTION_KEY = None
76 ENCRYPTION_KEY = None
76
77
@@ -115,14 +116,17
115 def process_bind_param(self, value, dialect):
116 def process_bind_param(self, value, dialect):
116 if not value:
117 if not value:
117 return value
118 return value
118 if value.startswith('enc$aes$'):
119 if value.startswith('enc$aes$') or value.startswith('enc$aes_hmac$'):
119 # protect against double encrypting if someone manually starts
120 # protect against double encrypting if someone manually starts
120 # doing
121 # doing
121 raise ValueError('value needs to be in unencrypted format, ie. '
122 raise ValueError('value needs to be in unencrypted format, ie. '
122 'not starting with enc$aes$')
123 'not starting with enc$aes')
123 return 'enc$aes$%s' % AESCipher(ENCRYPTION_KEY).encrypt(value)
124 return 'enc$aes_hmac$%s' % AESCipher(
125 ENCRYPTION_KEY, hmac=True).encrypt(value)
124
126
125 def process_result_value(self, value, dialect):
127 def process_result_value(self, value, dialect):
128 import rhodecode
129
126 if not value:
130 if not value:
127 return value
131 return value
128
132
@@ -134,9 +138,19
134 if parts[0] != 'enc':
138 if parts[0] != 'enc':
135 # parts ok but without our header ?
139 # parts ok but without our header ?
136 return value
140 return value
137
141 enc_strict_mode = str2bool(rhodecode.CONFIG.get(
142 'rhodecode.encrypted_values.strict') or True)
138 # at that stage we know it's our encryption
143 # at that stage we know it's our encryption
139 decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2])
144 if parts[1] == 'aes':
145 decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2])
146 elif parts[1] == 'aes_hmac':
147 decrypted_data = AESCipher(
148 ENCRYPTION_KEY, hmac=True,
149 strict_verification=enc_strict_mode).decrypt(parts[2])
150 else:
151 raise ValueError(
152 'Encryption type part is wrong, must be `aes` '
153 'or `aes_hmac`, got `%s` instead' % (parts[1]))
140 return decrypted_data
154 return decrypted_data
141
155
142
156
@@ -1754,7 +1768,7
1754 clone_uri = self.clone_uri
1768 clone_uri = self.clone_uri
1755 if clone_uri:
1769 if clone_uri:
1756 import urlobject
1770 import urlobject
1757 url_obj = urlobject.URLObject(self.clone_uri)
1771 url_obj = urlobject.URLObject(clone_uri)
1758 if url_obj.password:
1772 if url_obj.password:
1759 clone_uri = url_obj.with_password('*****')
1773 clone_uri = url_obj.with_password('*****')
1760 return clone_uri
1774 return clone_uri
@@ -195,7 +195,7
195 %if repo_instance.clone_uri:
195 %if repo_instance.clone_uri:
196 <p>
196 <p>
197 <i class="icon-code-fork"></i> ${_('Clone from')}
197 <i class="icon-code-fork"></i> ${_('Clone from')}
198 <a href="${h.url(str(h.hide_credentials(repo_instance.clone_uri)))}">${h.hide_credentials(repo_instance.clone_uri)}</a>
198 <a href="${h.url(h.safe_str(h.hide_credentials(repo_instance.clone_uri)))}">${h.hide_credentials(repo_instance.clone_uri)}</a>
199 </p>
199 </p>
200 %endif
200 %endif
201
201
@@ -20,7 +20,8
20
20
21 import pytest
21 import pytest
22
22
23 from rhodecode.lib.encrypt import AESCipher
23 from rhodecode.lib.encrypt import (
24 AESCipher, SignatureVerificationError, InvalidDecryptedValue)
24
25
25
26
26 class TestEncryptModule(object):
27 class TestEncryptModule(object):
@@ -38,3 +39,38
38 def test_encryption(self, key, text):
39 def test_encryption(self, key, text):
39 enc = AESCipher(key).encrypt(text)
40 enc = AESCipher(key).encrypt(text)
40 assert AESCipher(key).decrypt(enc) == text
41 assert AESCipher(key).decrypt(enc) == text
42
43 def test_encryption_with_hmac(self):
44 key = 'secret'
45 text = 'ihatemysql'
46 enc = AESCipher(key, hmac=True).encrypt(text)
47 assert AESCipher(key, hmac=True).decrypt(enc) == text
48
49 def test_encryption_with_hmac_with_bad_key(self):
50 key = 'secretstring'
51 text = 'ihatemysql'
52 enc = AESCipher(key, hmac=True).encrypt(text)
53
54 with pytest.raises(SignatureVerificationError) as e:
55 assert AESCipher('differentsecret', hmac=True).decrypt(enc) == ''
56
57 assert 'Encryption signature verification failed' in str(e)
58
59 def test_encryption_with_hmac_with_bad_data(self):
60 key = 'secret'
61 text = 'ihatemysql'
62 enc = AESCipher(key, hmac=True).encrypt(text)
63 enc = 'xyz' + enc[3:]
64 with pytest.raises(SignatureVerificationError) as e:
65 assert AESCipher(key, hmac=True).decrypt(enc) == text
66
67 assert 'Encryption signature verification failed' in str(e)
68
69 def test_encryption_with_hmac_with_bad_key_not_strict(self):
70 key = 'secretstring'
71 text = 'ihatemysql'
72 enc = AESCipher(key, hmac=True).encrypt(text)
73
74 assert isinstance(AESCipher(
75 'differentsecret', hmac=True, strict_verification=False
76 ).decrypt(enc), InvalidDecryptedValue)
Comments 0
You need to be logged in to leave comments. Login now