# HG changeset patch # User Marcin Kuzminski # Date 2016-06-27 17:26:03 # Node ID f41dae1c0c923b468c4bde001bff397de4e25d2a # Parent 136f6d39db202a49255b298e38228e2cc23da04e encryption: Implement a slightly improved AesCipher encryption. This addresses issues from #4036 - we use a backward compatible cipher that can implement HMAC signature - with HMAC signature we can detected mangled data or wrong secret key - this allows more explicit errors than UnicodeDecode errors - added a new .ini values that controls strict mode diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -119,6 +119,10 @@ rhodecode.api.url = /_admin/api ## `beaker.session.secret` #rhodecode.encrypted_values.secret = +## decryption strict mode (enabled by default). It controls if decryption raises +## `SignatureVerificationError` in case of wrong key, or damaged encryption data. +#rhodecode.encrypted_values.strict = false + full_stack = true ## Serve static files via RhodeCode, disable to serve them via HTTP server diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -93,6 +93,10 @@ use = egg:rhodecode-enterprise-ce ## `beaker.session.secret` #rhodecode.encrypted_values.secret = +## decryption strict mode (enabled by default). It controls if decryption raises +## `SignatureVerificationError` in case of wrong key, or damaged encryption data. +#rhodecode.encrypted_values.strict = false + full_stack = true ## Serve static files via RhodeCode, disable to serve them via HTTP server diff --git a/rhodecode/lib/encrypt.py b/rhodecode/lib/encrypt.py --- a/rhodecode/lib/encrypt.py +++ b/rhodecode/lib/encrypt.py @@ -23,31 +23,84 @@ Generic encryption library for RhodeCode """ -import hashlib import base64 from Crypto.Cipher import AES from Crypto import Random +from Crypto.Hash import HMAC, SHA256 from rhodecode.lib.utils2 import safe_str +class SignatureVerificationError(Exception): + pass + + +class InvalidDecryptedValue(str): + + def __new__(cls, content): + """ + This will generate something like this:: + + And represent a safe indicator that encryption key is broken + """ + content = '<{}({}...)>'.format(cls.__name__, content[:16]) + return str.__new__(cls, content) + + class AESCipher(object): - def __init__(self, key): - # create padding, trim to long enc key + def __init__(self, key, hmac=False, strict_verification=True): if not key: raise ValueError('passed key variable is empty') + self.strict_verification = strict_verification self.block_size = 32 - self.key = hashlib.sha256(safe_str(key)).digest() + self.hmac_size = 32 + self.hmac = hmac + + self.key = SHA256.new(safe_str(key)).digest() + self.hmac_key = SHA256.new(self.key).digest() + + def verify_hmac_signature(self, raw_data): + org_hmac_signature = raw_data[-self.hmac_size:] + data_without_sig = raw_data[:-self.hmac_size] + recomputed_hmac = HMAC.new( + self.hmac_key, data_without_sig, digestmod=SHA256).digest() + return org_hmac_signature == recomputed_hmac def encrypt(self, raw): raw = self._pad(raw) iv = Random.new().read(AES.block_size) cipher = AES.new(self.key, AES.MODE_CBC, iv) - return base64.b64encode(iv + cipher.encrypt(raw)) + enc_value = cipher.encrypt(raw) + + hmac_signature = '' + if self.hmac: + # compute hmac+sha256 on iv + enc text, we use + # encrypt then mac method to create the signature + hmac_signature = HMAC.new( + self.hmac_key, iv + enc_value, digestmod=SHA256).digest() + + return base64.b64encode(iv + enc_value + hmac_signature) def decrypt(self, enc): + enc_org = enc enc = base64.b64decode(enc) + + if self.hmac and len(enc) > self.hmac_size: + if self.verify_hmac_signature(enc): + # cut off the HMAC verification digest + enc = enc[:-self.hmac_size] + else: + if self.strict_verification: + raise SignatureVerificationError( + "Encryption signature verification failed. " + "Please check your secret key, and/or encrypted value. " + "Secret key is stored as " + "`rhodecode.encrypted_values.secret` or " + "`beaker.session.secret` inside .ini file") + + return InvalidDecryptedValue(enc_org) + iv = enc[:AES.block_size] cipher = AES.new(self.key, AES.MODE_CBC, iv) return self._unpad(cipher.decrypt(enc[AES.block_size:])) diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -70,7 +70,8 @@ log = logging.getLogger(__name__) # BASE CLASSES # ============================================================================= -# this is propagated from .ini file beaker.session.secret +# this is propagated from .ini file rhodecode.encrypted_values.secret or +# beaker.session.secret if first is not set. # and initialized at environment.py ENCRYPTION_KEY = None @@ -115,14 +116,17 @@ class EncryptedTextValue(TypeDecorator): def process_bind_param(self, value, dialect): if not value: return value - if value.startswith('enc$aes$'): + if value.startswith('enc$aes$') or value.startswith('enc$aes_hmac$'): # protect against double encrypting if someone manually starts # doing raise ValueError('value needs to be in unencrypted format, ie. ' - 'not starting with enc$aes$') - return 'enc$aes$%s' % AESCipher(ENCRYPTION_KEY).encrypt(value) + 'not starting with enc$aes') + return 'enc$aes_hmac$%s' % AESCipher( + ENCRYPTION_KEY, hmac=True).encrypt(value) def process_result_value(self, value, dialect): + import rhodecode + if not value: return value @@ -134,9 +138,19 @@ class EncryptedTextValue(TypeDecorator): if parts[0] != 'enc': # parts ok but without our header ? return value - + enc_strict_mode = str2bool(rhodecode.CONFIG.get( + 'rhodecode.encrypted_values.strict') or True) # at that stage we know it's our encryption - decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2]) + if parts[1] == 'aes': + decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2]) + elif parts[1] == 'aes_hmac': + decrypted_data = AESCipher( + ENCRYPTION_KEY, hmac=True, + strict_verification=enc_strict_mode).decrypt(parts[2]) + else: + raise ValueError( + 'Encryption type part is wrong, must be `aes` ' + 'or `aes_hmac`, got `%s` instead' % (parts[1])) return decrypted_data @@ -1754,7 +1768,7 @@ class Repository(Base, BaseModel): clone_uri = self.clone_uri if clone_uri: import urlobject - url_obj = urlobject.URLObject(self.clone_uri) + url_obj = urlobject.URLObject(clone_uri) if url_obj.password: clone_uri = url_obj.with_password('*****') return clone_uri diff --git a/rhodecode/templates/base/base.html b/rhodecode/templates/base/base.html --- a/rhodecode/templates/base/base.html +++ b/rhodecode/templates/base/base.html @@ -195,7 +195,7 @@ %if repo_instance.clone_uri:

${_('Clone from')} - ${h.hide_credentials(repo_instance.clone_uri)} + ${h.hide_credentials(repo_instance.clone_uri)}

%endif diff --git a/rhodecode/tests/lib/test_encrypt.py b/rhodecode/tests/lib/test_encrypt.py --- a/rhodecode/tests/lib/test_encrypt.py +++ b/rhodecode/tests/lib/test_encrypt.py @@ -20,7 +20,8 @@ import pytest -from rhodecode.lib.encrypt import AESCipher +from rhodecode.lib.encrypt import ( + AESCipher, SignatureVerificationError, InvalidDecryptedValue) class TestEncryptModule(object): @@ -38,3 +39,38 @@ class TestEncryptModule(object): def test_encryption(self, key, text): enc = AESCipher(key).encrypt(text) assert AESCipher(key).decrypt(enc) == text + + def test_encryption_with_hmac(self): + key = 'secret' + text = 'ihatemysql' + enc = AESCipher(key, hmac=True).encrypt(text) + assert AESCipher(key, hmac=True).decrypt(enc) == text + + def test_encryption_with_hmac_with_bad_key(self): + key = 'secretstring' + text = 'ihatemysql' + enc = AESCipher(key, hmac=True).encrypt(text) + + with pytest.raises(SignatureVerificationError) as e: + assert AESCipher('differentsecret', hmac=True).decrypt(enc) == '' + + assert 'Encryption signature verification failed' in str(e) + + def test_encryption_with_hmac_with_bad_data(self): + key = 'secret' + text = 'ihatemysql' + enc = AESCipher(key, hmac=True).encrypt(text) + enc = 'xyz' + enc[3:] + with pytest.raises(SignatureVerificationError) as e: + assert AESCipher(key, hmac=True).decrypt(enc) == text + + assert 'Encryption signature verification failed' in str(e) + + def test_encryption_with_hmac_with_bad_key_not_strict(self): + key = 'secretstring' + text = 'ihatemysql' + enc = AESCipher(key, hmac=True).encrypt(text) + + assert isinstance(AESCipher( + 'differentsecret', hmac=True, strict_verification=False + ).decrypt(enc), InvalidDecryptedValue)