diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -133,6 +133,11 @@ rhodecode.api.url = /_admin/api ## `SignatureVerificationError` in case of wrong key, or damaged encryption data. #rhodecode.encrypted_values.strict = false +## Pick algorithm for encryption. Either fernet (more secure) or aes (default) +## fernet is safer, and we strongly recommend switching to it. +## Due to backward compatibility aes is used as default. +#rhodecode.encrypted_values.algorithm = fernet + ## return gzipped responses from RhodeCode (static files/application) gzip_responses = false diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -108,6 +108,11 @@ use = egg:rhodecode-enterprise-ce ## `SignatureVerificationError` in case of wrong key, or damaged encryption data. #rhodecode.encrypted_values.strict = false +## Pick algorithm for encryption. Either fernet (more secure) or aes (default) +## fernet is safer, and we strongly recommend switching to it. +## Due to backward compatibility aes is used as default. +#rhodecode.encrypted_values.algorithm = fernet + ## return gzipped responses from RhodeCode (static files/application) gzip_responses = false diff --git a/rhodecode/lib/encrypt.py b/rhodecode/lib/encrypt.py --- a/rhodecode/lib/encrypt.py +++ b/rhodecode/lib/encrypt.py @@ -111,4 +111,28 @@ class AESCipher(object): @staticmethod def _unpad(s): - return s[:-ord(s[len(s)-1:])] \ No newline at end of file + return s[:-ord(s[len(s)-1:])] + + +def validate_and_get_enc_data(enc_data, enc_key, enc_strict_mode): + parts = enc_data.split('$', 3) + if not len(parts) == 3: + # probably not encrypted values + return enc_data + else: + if parts[0] != 'enc': + # parts ok but without our header ? + return enc_data + + # at that stage we know it's our encryption + if parts[1] == 'aes': + decrypted_data = AESCipher(enc_key).decrypt(parts[2]) + elif parts[1] == 'aes_hmac': + decrypted_data = AESCipher( + enc_key, hmac=True, + strict_verification=enc_strict_mode).decrypt(parts[2]) + else: + raise ValueError( + 'Encryption type part is wrong, must be `aes` ' + 'or `aes_hmac`, got `%s` instead' % (parts[1])) + return decrypted_data diff --git a/rhodecode/lib/encrypt2.py b/rhodecode/lib/encrypt2.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/encrypt2.py @@ -0,0 +1,69 @@ +import os +import base64 +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + + +class Encryptor(object): + key_format = 'enc2$salt:{}$data:{}' + pref_len = 5 # salt:, data: + + def __init__(self, enc_key): + self.enc_key = enc_key + + def b64_encode(self, data): + return base64.urlsafe_b64encode(data) + + def b64_decode(self, data): + return base64.urlsafe_b64decode(data) + + def get_encryptor(self, salt): + """ + Uses Fernet as encryptor with HMAC signature + :param salt: random salt used for encrypting the data + """ + kdf = PBKDF2HMAC( + algorithm=hashes.SHA512(), + length=32, + salt=salt, + iterations=100000, + backend=default_backend() + ) + key = self.b64_encode(kdf.derive(self.enc_key)) + return Fernet(key) + + def _get_parts(self, enc_data): + parts = enc_data.split('$', 3) + if len(parts) != 3: + raise ValueError('Encrypted Data has invalid format, expected {}'.format(self.key_format)) + prefix, salt, enc_data = parts + + try: + salt = self.b64_decode(salt[self.pref_len:]) + except TypeError: + # bad base64 + raise ValueError('Encrypted Data salt invalid format, expected base64 format') + + enc_data = enc_data[self.pref_len:] + return prefix, salt, enc_data + + def encrypt(self, data): + salt = os.urandom(64) + encryptor = self.get_encryptor(salt) + enc_data = encryptor.encrypt(data) + return self.key_format.format(self.b64_encode(salt), enc_data) + + def decrypt(self, data, safe=True): + parts = self._get_parts(data) + salt = parts[1] + enc_data = parts[2] + encryptor = self.get_encryptor(salt) + try: + return encryptor.decrypt(enc_data) + except (InvalidToken,): + if safe: + return '' + else: + raise diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -62,8 +62,8 @@ from rhodecode.lib.jsonalchemy import Mu JsonRaw from rhodecode.lib.ext_json import json from rhodecode.lib.caching_query import FromCache -from rhodecode.lib.encrypt import AESCipher - +from rhodecode.lib.encrypt import AESCipher, validate_and_get_enc_data +from rhodecode.lib.encrypt2 import Encryptor from rhodecode.model.meta import Base, Session URL_SEP = '/' @@ -159,44 +159,46 @@ class EncryptedTextValue(TypeDecorator): impl = Text def process_bind_param(self, value, dialect): - if not value: - return value - if value.startswith('enc$aes$') or value.startswith('enc$aes_hmac$'): - # protect against double encrypting if someone manually starts - # doing - raise ValueError('value needs to be in unencrypted format, ie. ' - 'not starting with enc$aes') - return 'enc$aes_hmac$%s' % AESCipher( - ENCRYPTION_KEY, hmac=True).encrypt(value) - - def process_result_value(self, value, dialect): + """ + Setter for storing value + """ import rhodecode - if not value: return value - parts = value.split('$', 3) - if not len(parts) == 3: - # probably not encrypted values - return value + # protect against double encrypting if values is already encrypted + if value.startswith('enc$aes$') \ + or value.startswith('enc$aes_hmac$') \ + or value.startswith('enc2$'): + raise ValueError('value needs to be in unencrypted format, ' + 'ie. not starting with enc$ or enc2$') + + algo = rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes' + if algo == 'aes': + return 'enc$aes_hmac$%s' % AESCipher(ENCRYPTION_KEY, hmac=True).encrypt(value) + elif algo == 'fernet': + return Encryptor(ENCRYPTION_KEY).encrypt(value) else: - if parts[0] != 'enc': - # parts ok but without our header ? - return value - enc_strict_mode = str2bool(rhodecode.CONFIG.get( - 'rhodecode.encrypted_values.strict') or True) - # at that stage we know it's our encryption - if parts[1] == 'aes': - decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2]) - elif parts[1] == 'aes_hmac': - decrypted_data = AESCipher( - ENCRYPTION_KEY, hmac=True, - strict_verification=enc_strict_mode).decrypt(parts[2]) - else: - raise ValueError( - 'Encryption type part is wrong, must be `aes` ' - 'or `aes_hmac`, got `%s` instead' % (parts[1])) - return decrypted_data + ValueError('Bad encryption algorithm, should be fernet or aes, got: {}'.format(algo)) + + def process_result_value(self, value, dialect): + """ + Getter for retrieving value + """ + + import rhodecode + if not value: + return value + + algo = rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes' + enc_strict_mode = str2bool(rhodecode.CONFIG.get('rhodecode.encrypted_values.strict') or True) + if algo == 'aes': + decrypted_data = validate_and_get_enc_data(value, ENCRYPTION_KEY, enc_strict_mode) + elif algo == 'fernet': + return Encryptor(ENCRYPTION_KEY).decrypt(value) + else: + ValueError('Bad encryption algorithm, should be fernet or aes, got: {}'.format(algo)) + return decrypted_data class BaseModel(object): diff --git a/rhodecode/tests/lib/test_encrypt.py b/rhodecode/tests/lib/test_encrypt.py --- a/rhodecode/tests/lib/test_encrypt.py +++ b/rhodecode/tests/lib/test_encrypt.py @@ -22,6 +22,7 @@ import pytest from rhodecode.lib.encrypt import ( AESCipher, SignatureVerificationError, InvalidDecryptedValue) +from rhodecode.lib.encrypt2 import (Encryptor, InvalidToken) class TestEncryptModule(object): @@ -74,3 +75,59 @@ class TestEncryptModule(object): assert isinstance(AESCipher( 'differentsecret', hmac=True, strict_verification=False ).decrypt(enc), InvalidDecryptedValue) + + +class TestEncryptModule2(object): + + @pytest.mark.parametrize( + "key, text", + [ + ('a', 'short'), + ('a'*64, 'too long(trimmed to 32)'), + ('a'*32, 'just enough'), + ('ąćęćę', 'non asci'), + ('$asa$asa', 'special $ used'), + ] + ) + def test_encryption(self, key, text): + enc = Encryptor(key).encrypt(text) + assert Encryptor(key).decrypt(enc) == text + + def test_encryption_with_bad_key(self): + key = 'secretstring' + text = 'ihatemysql' + enc = Encryptor(key).encrypt(text) + + assert Encryptor('differentsecret').decrypt(enc) == '' + + def test_encryption_with_bad_key_raises(self): + key = 'secretstring' + text = 'ihatemysql' + enc = Encryptor(key).encrypt(text) + + with pytest.raises(InvalidToken) as e: + Encryptor('differentsecret').decrypt(enc, safe=False) + + assert 'InvalidToken' in str(e) + + def test_encryption_with_bad_format_data(self): + key = 'secret' + text = 'ihatemysql' + enc = Encryptor(key).encrypt(text) + enc = '$xyz' + enc[3:] + + with pytest.raises(ValueError) as e: + Encryptor(key).decrypt(enc, safe=False) + + assert 'Encrypted Data has invalid format' in str(e) + + def test_encryption_with_bad_data(self): + key = 'secret' + text = 'ihatemysql' + enc = Encryptor(key).encrypt(text) + enc = enc[:-5] + + with pytest.raises(InvalidToken) as e: + Encryptor(key).decrypt(enc, safe=False) + + assert 'InvalidToken' in str(e)