##// END OF EJS Templates
encryption: Implement a slightly improved AesCipher encryption....
marcink -
r281:f41dae1c default
parent child Browse files
Show More
@@ -119,6 +119,10 b' rhodecode.api.url = /_admin/api'
119 119 ## `beaker.session.secret`
120 120 #rhodecode.encrypted_values.secret =
121 121
122 ## decryption strict mode (enabled by default). It controls if decryption raises
123 ## `SignatureVerificationError` in case of wrong key, or damaged encryption data.
124 #rhodecode.encrypted_values.strict = false
125
122 126 full_stack = true
123 127
124 128 ## Serve static files via RhodeCode, disable to serve them via HTTP server
@@ -93,6 +93,10 b' use = egg:rhodecode-enterprise-ce'
93 93 ## `beaker.session.secret`
94 94 #rhodecode.encrypted_values.secret =
95 95
96 ## decryption strict mode (enabled by default). It controls if decryption raises
97 ## `SignatureVerificationError` in case of wrong key, or damaged encryption data.
98 #rhodecode.encrypted_values.strict = false
99
96 100 full_stack = true
97 101
98 102 ## Serve static files via RhodeCode, disable to serve them via HTTP server
@@ -23,31 +23,84 b''
23 23 Generic encryption library for RhodeCode
24 24 """
25 25
26 import hashlib
27 26 import base64
28 27
29 28 from Crypto.Cipher import AES
30 29 from Crypto import Random
30 from Crypto.Hash import HMAC, SHA256
31 31
32 32 from rhodecode.lib.utils2 import safe_str
33 33
34 34
35 class SignatureVerificationError(Exception):
36 pass
37
38
39 class InvalidDecryptedValue(str):
40
41 def __new__(cls, content):
42 """
43 This will generate something like this::
44 <InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
45 And represent a safe indicator that encryption key is broken
46 """
47 content = '<{}({}...)>'.format(cls.__name__, content[:16])
48 return str.__new__(cls, content)
49
50
35 51 class AESCipher(object):
36 def __init__(self, key):
37 # create padding, trim to long enc key
52 def __init__(self, key, hmac=False, strict_verification=True):
38 53 if not key:
39 54 raise ValueError('passed key variable is empty')
55 self.strict_verification = strict_verification
40 56 self.block_size = 32
41 self.key = hashlib.sha256(safe_str(key)).digest()
57 self.hmac_size = 32
58 self.hmac = hmac
59
60 self.key = SHA256.new(safe_str(key)).digest()
61 self.hmac_key = SHA256.new(self.key).digest()
62
63 def verify_hmac_signature(self, raw_data):
64 org_hmac_signature = raw_data[-self.hmac_size:]
65 data_without_sig = raw_data[:-self.hmac_size]
66 recomputed_hmac = HMAC.new(
67 self.hmac_key, data_without_sig, digestmod=SHA256).digest()
68 return org_hmac_signature == recomputed_hmac
42 69
43 70 def encrypt(self, raw):
44 71 raw = self._pad(raw)
45 72 iv = Random.new().read(AES.block_size)
46 73 cipher = AES.new(self.key, AES.MODE_CBC, iv)
47 return base64.b64encode(iv + cipher.encrypt(raw))
74 enc_value = cipher.encrypt(raw)
75
76 hmac_signature = ''
77 if self.hmac:
78 # compute hmac+sha256 on iv + enc text, we use
79 # encrypt then mac method to create the signature
80 hmac_signature = HMAC.new(
81 self.hmac_key, iv + enc_value, digestmod=SHA256).digest()
82
83 return base64.b64encode(iv + enc_value + hmac_signature)
48 84
49 85 def decrypt(self, enc):
86 enc_org = enc
50 87 enc = base64.b64decode(enc)
88
89 if self.hmac and len(enc) > self.hmac_size:
90 if self.verify_hmac_signature(enc):
91 # cut off the HMAC verification digest
92 enc = enc[:-self.hmac_size]
93 else:
94 if self.strict_verification:
95 raise SignatureVerificationError(
96 "Encryption signature verification failed. "
97 "Please check your secret key, and/or encrypted value. "
98 "Secret key is stored as "
99 "`rhodecode.encrypted_values.secret` or "
100 "`beaker.session.secret` inside .ini file")
101
102 return InvalidDecryptedValue(enc_org)
103
51 104 iv = enc[:AES.block_size]
52 105 cipher = AES.new(self.key, AES.MODE_CBC, iv)
53 106 return self._unpad(cipher.decrypt(enc[AES.block_size:]))
@@ -70,7 +70,8 b' log = logging.getLogger(__name__)'
70 70 # BASE CLASSES
71 71 # =============================================================================
72 72
73 # this is propagated from .ini file beaker.session.secret
73 # this is propagated from .ini file rhodecode.encrypted_values.secret or
74 # beaker.session.secret if first is not set.
74 75 # and initialized at environment.py
75 76 ENCRYPTION_KEY = None
76 77
@@ -115,14 +116,17 b' class EncryptedTextValue(TypeDecorator):'
115 116 def process_bind_param(self, value, dialect):
116 117 if not value:
117 118 return value
118 if value.startswith('enc$aes$'):
119 if value.startswith('enc$aes$') or value.startswith('enc$aes_hmac$'):
119 120 # protect against double encrypting if someone manually starts
120 121 # doing
121 122 raise ValueError('value needs to be in unencrypted format, ie. '
122 'not starting with enc$aes$')
123 return 'enc$aes$%s' % AESCipher(ENCRYPTION_KEY).encrypt(value)
123 'not starting with enc$aes')
124 return 'enc$aes_hmac$%s' % AESCipher(
125 ENCRYPTION_KEY, hmac=True).encrypt(value)
124 126
125 127 def process_result_value(self, value, dialect):
128 import rhodecode
129
126 130 if not value:
127 131 return value
128 132
@@ -134,9 +138,19 b' class EncryptedTextValue(TypeDecorator):'
134 138 if parts[0] != 'enc':
135 139 # parts ok but without our header ?
136 140 return value
137
141 enc_strict_mode = str2bool(rhodecode.CONFIG.get(
142 'rhodecode.encrypted_values.strict') or True)
138 143 # at that stage we know it's our encryption
144 if parts[1] == 'aes':
139 145 decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2])
146 elif parts[1] == 'aes_hmac':
147 decrypted_data = AESCipher(
148 ENCRYPTION_KEY, hmac=True,
149 strict_verification=enc_strict_mode).decrypt(parts[2])
150 else:
151 raise ValueError(
152 'Encryption type part is wrong, must be `aes` '
153 'or `aes_hmac`, got `%s` instead' % (parts[1]))
140 154 return decrypted_data
141 155
142 156
@@ -1754,7 +1768,7 b' class Repository(Base, BaseModel):'
1754 1768 clone_uri = self.clone_uri
1755 1769 if clone_uri:
1756 1770 import urlobject
1757 url_obj = urlobject.URLObject(self.clone_uri)
1771 url_obj = urlobject.URLObject(clone_uri)
1758 1772 if url_obj.password:
1759 1773 clone_uri = url_obj.with_password('*****')
1760 1774 return clone_uri
@@ -195,7 +195,7 b''
195 195 %if repo_instance.clone_uri:
196 196 <p>
197 197 <i class="icon-code-fork"></i> ${_('Clone from')}
198 <a href="${h.url(str(h.hide_credentials(repo_instance.clone_uri)))}">${h.hide_credentials(repo_instance.clone_uri)}</a>
198 <a href="${h.url(h.safe_str(h.hide_credentials(repo_instance.clone_uri)))}">${h.hide_credentials(repo_instance.clone_uri)}</a>
199 199 </p>
200 200 %endif
201 201
@@ -20,7 +20,8 b''
20 20
21 21 import pytest
22 22
23 from rhodecode.lib.encrypt import AESCipher
23 from rhodecode.lib.encrypt import (
24 AESCipher, SignatureVerificationError, InvalidDecryptedValue)
24 25
25 26
26 27 class TestEncryptModule(object):
@@ -38,3 +39,38 b' class TestEncryptModule(object):'
38 39 def test_encryption(self, key, text):
39 40 enc = AESCipher(key).encrypt(text)
40 41 assert AESCipher(key).decrypt(enc) == text
42
43 def test_encryption_with_hmac(self):
44 key = 'secret'
45 text = 'ihatemysql'
46 enc = AESCipher(key, hmac=True).encrypt(text)
47 assert AESCipher(key, hmac=True).decrypt(enc) == text
48
49 def test_encryption_with_hmac_with_bad_key(self):
50 key = 'secretstring'
51 text = 'ihatemysql'
52 enc = AESCipher(key, hmac=True).encrypt(text)
53
54 with pytest.raises(SignatureVerificationError) as e:
55 assert AESCipher('differentsecret', hmac=True).decrypt(enc) == ''
56
57 assert 'Encryption signature verification failed' in str(e)
58
59 def test_encryption_with_hmac_with_bad_data(self):
60 key = 'secret'
61 text = 'ihatemysql'
62 enc = AESCipher(key, hmac=True).encrypt(text)
63 enc = 'xyz' + enc[3:]
64 with pytest.raises(SignatureVerificationError) as e:
65 assert AESCipher(key, hmac=True).decrypt(enc) == text
66
67 assert 'Encryption signature verification failed' in str(e)
68
69 def test_encryption_with_hmac_with_bad_key_not_strict(self):
70 key = 'secretstring'
71 text = 'ihatemysql'
72 enc = AESCipher(key, hmac=True).encrypt(text)
73
74 assert isinstance(AESCipher(
75 'differentsecret', hmac=True, strict_verification=False
76 ).decrypt(enc), InvalidDecryptedValue)
General Comments 0
You need to be logged in to leave comments. Login now