# HG changeset patch # User RhodeCode Admin # Date 2023-03-27 19:44:36 # Node ID 676da06bbc5a64bf8750964b596b215f0fb8edf5 # Parent 4e9283a12596560a1efdf3bee03f638e9e99bf9c encryption: unified and rewrote encryption modules to be consistent no matter what algo is used. - few python3 fixes as well diff --git a/rhodecode/lib/enc_utils.py b/rhodecode/lib/enc_utils.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/enc_utils.py @@ -0,0 +1,48 @@ +from rhodecode.lib.str_utils import safe_bytes +from rhodecode.lib.encrypt import encrypt_data, validate_and_decrypt_data +from rhodecode.lib.encrypt2 import Encryptor + +ALLOWED_ALGOS = ['aes', 'fernet'] + + +def get_default_algo(): + import rhodecode + return rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes' + + +def encrypt_value(value: bytes, enc_key: bytes, algo: str = ''): + if not algo: + # not explicit algo, just use what's set by config + algo = get_default_algo() + if algo not in ALLOWED_ALGOS: + ValueError(f'Bad encryption algorithm, should be {ALLOWED_ALGOS}, got: {algo}') + + enc_key = safe_bytes(enc_key) + value = safe_bytes(value) + + if algo == 'aes': + return encrypt_data(value, enc_key=enc_key) + if algo == 'fernet': + return Encryptor(enc_key).encrypt(value) + + return value + + +def decrypt_value(value: bytes, enc_key: bytes, algo: str = '', strict_mode: bool = False): + + if not algo: + # not explicit algo, just use what's set by config + algo = get_default_algo() + if algo not in ALLOWED_ALGOS: + ValueError(f'Bad encryption algorithm, should be {ALLOWED_ALGOS}, got: {algo}') + + enc_key = safe_bytes(enc_key) + value = safe_bytes(value) + safe = not strict_mode + + if algo == 'aes': + return validate_and_decrypt_data(value, enc_key, safe=safe) + if algo == 'fernet': + return Encryptor(enc_key).decrypt(value, safe=safe) + + return value diff --git a/rhodecode/lib/encrypt.py b/rhodecode/lib/encrypt.py --- a/rhodecode/lib/encrypt.py +++ b/rhodecode/lib/encrypt.py @@ -24,16 +24,14 @@ Generic encryption library for RhodeCode """ import base64 +import logging from Crypto.Cipher import AES from Crypto import Random from Crypto.Hash import HMAC, SHA256 -from rhodecode.lib.str_utils import safe_bytes - - -class SignatureVerificationError(Exception): - pass +from rhodecode.lib.str_utils import safe_bytes, safe_str +from rhodecode.lib.exceptions import signature_verification_error class InvalidDecryptedValue(str): @@ -47,9 +45,13 @@ class InvalidDecryptedValue(str): content = '<{}({}...)>'.format(cls.__name__, content[:16]) return str.__new__(cls, content) +KEY_FORMAT = b'enc$aes_hmac${1}' + class AESCipher(object): - def __init__(self, key, hmac=False, strict_verification=True): + + def __init__(self, key: bytes, hmac=False, strict_verification=True): + if not key: raise ValueError('passed key variable is empty') self.strict_verification = strict_verification @@ -67,13 +69,13 @@ class AESCipher(object): self.hmac_key, data_without_sig, digestmod=SHA256).digest() return org_hmac_signature == recomputed_hmac - def encrypt(self, raw): + def encrypt(self, raw: bytes): raw = self._pad(raw) iv = Random.new().read(AES.block_size) cipher = AES.new(self.key, AES.MODE_CBC, iv) enc_value = cipher.encrypt(raw) - hmac_signature = '' + hmac_signature = b'' if self.hmac: # compute hmac+sha256 on iv + enc text, we use # encrypt then mac method to create the signature @@ -82,57 +84,69 @@ class AESCipher(object): return base64.b64encode(iv + enc_value + hmac_signature) - def decrypt(self, enc): + def decrypt(self, enc, safe=True) -> bytes | InvalidDecryptedValue: enc_org = enc - enc = base64.b64decode(enc) + try: + enc = base64.b64decode(enc) + except Exception: + logging.exception('Failed Base64 decode') + raise signature_verification_error('Failed Base64 decode') if self.hmac and len(enc) > self.hmac_size: if self.verify_hmac_signature(enc): # cut off the HMAC verification digest enc = enc[:-self.hmac_size] else: - if self.strict_verification: - raise SignatureVerificationError( - "Encryption signature verification failed. " - "Please check your secret key, and/or encrypted value. " - "Secret key is stored as " - "`rhodecode.encrypted_values.secret` or " - "`beaker.session.secret` inside .ini file") - return InvalidDecryptedValue(enc_org) + decrypt_fail = InvalidDecryptedValue(safe_str(enc_org)) + if safe: + return decrypt_fail + raise signature_verification_error(decrypt_fail) iv = enc[:AES.block_size] cipher = AES.new(self.key, AES.MODE_CBC, iv) return self._unpad(cipher.decrypt(enc[AES.block_size:])) def _pad(self, s): - return (s + (self.block_size - len(s) % self.block_size) - * chr(self.block_size - len(s) % self.block_size)) + block_pad = (self.block_size - len(s) % self.block_size) + return s + block_pad * safe_bytes(chr(block_pad)) @staticmethod def _unpad(s): return s[:-ord(s[len(s)-1:])] -def validate_and_get_enc_data(enc_data, enc_key, enc_strict_mode): +def validate_and_decrypt_data(enc_data, enc_key, enc_strict_mode=False, safe=True): + enc_data = safe_str(enc_data) + parts = enc_data.split('$', 3) - if not len(parts) == 3: - # probably not encrypted values + if len(parts) != 3: + raise ValueError(f'Encrypted Data has invalid format, expected {KEY_FORMAT}, got {parts}') + + enc_type = parts[1] + enc_data_part = parts[2] + + if parts[0] != 'enc': + # parts ok but without our header ? return enc_data - else: - if parts[0] != 'enc': - # parts ok but without our header ? - return enc_data - # at that stage we know it's our encryption - if parts[1] == 'aes': - decrypted_data = AESCipher(enc_key).decrypt(parts[2]) - elif parts[1] == 'aes_hmac': - decrypted_data = AESCipher( - enc_key, hmac=True, - strict_verification=enc_strict_mode).decrypt(parts[2]) - else: - raise ValueError( - 'Encryption type part is wrong, must be `aes` ' - 'or `aes_hmac`, got `%s` instead' % (parts[1])) - return decrypted_data + # at that stage we know it's our encryption + if enc_type == 'aes': + decrypted_data = AESCipher(enc_key).decrypt(enc_data_part, safe=safe) + elif enc_type == 'aes_hmac': + decrypted_data = AESCipher( + enc_key, hmac=True, + strict_verification=enc_strict_mode).decrypt(enc_data_part, safe=safe) + + else: + raise ValueError( + f'Encryption type part is wrong, must be `aes` ' + f'or `aes_hmac`, got `{enc_type}` instead') + + return decrypted_data + + +def encrypt_data(data, enc_key: bytes): + enc_key = safe_bytes(enc_key) + enc_value = AESCipher(enc_key, hmac=True).encrypt(safe_bytes(data)) + return KEY_FORMAT.replace(b'{1}', enc_value) diff --git a/rhodecode/lib/encrypt2.py b/rhodecode/lib/encrypt2.py --- a/rhodecode/lib/encrypt2.py +++ b/rhodecode/lib/encrypt2.py @@ -5,12 +5,27 @@ from cryptography.hazmat.backends import from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from rhodecode.lib.str_utils import safe_str +from rhodecode.lib.exceptions import signature_verification_error + + +class InvalidDecryptedValue(str): + + def __new__(cls, content): + """ + This will generate something like this:: + + And represent a safe indicator that encryption key is broken + """ + content = '<{}({}...)>'.format(cls.__name__, content[:16]) + return str.__new__(cls, content) + class Encryptor(object): - key_format = 'enc2$salt:{}$data:{}' + key_format = b'enc2$salt:{1}$data:{2}' pref_len = 5 # salt:, data: - def __init__(self, enc_key): + def __init__(self, enc_key: bytes): self.enc_key = enc_key def b64_encode(self, data): @@ -35,9 +50,9 @@ class Encryptor(object): return Fernet(key) def _get_parts(self, enc_data): - parts = enc_data.split('$', 3) + parts = enc_data.split(b'$', 3) if len(parts) != 3: - raise ValueError('Encrypted Data has invalid format, expected {}'.format(self.key_format)) + raise ValueError(f'Encrypted Data has invalid format, expected {self.key_format}, got {parts}') prefix, salt, enc_data = parts try: @@ -49,13 +64,13 @@ class Encryptor(object): enc_data = enc_data[self.pref_len:] return prefix, salt, enc_data - def encrypt(self, data): + def encrypt(self, data) -> bytes: salt = os.urandom(64) encryptor = self.get_encryptor(salt) enc_data = encryptor.encrypt(data) - return self.key_format.format(self.b64_encode(salt), enc_data) + return self.key_format.replace(b'{1}', self.b64_encode(salt)).replace(b'{2}', enc_data) - def decrypt(self, data, safe=True): + def decrypt(self, data, safe=True) -> bytes | InvalidDecryptedValue: parts = self._get_parts(data) salt = parts[1] enc_data = parts[2] @@ -63,7 +78,7 @@ class Encryptor(object): try: return encryptor.decrypt(enc_data) except (InvalidToken,): + decrypt_fail = InvalidDecryptedValue(safe_str(data)) if safe: - return '' - else: - raise + return decrypt_fail + raise signature_verification_error(decrypt_fail) diff --git a/rhodecode/model/db.py b/rhodecode/model/db.py --- a/rhodecode/model/db.py +++ b/rhodecode/model/db.py @@ -63,10 +63,10 @@ from rhodecode.lib.utils2 import ( glob2re, StrictAttributeDict, cleaned_uri, datetime_to_time) from rhodecode.lib.jsonalchemy import MutationObj, MutationList, JsonType, \ JsonRaw +from rhodecode.lib import ext_json +from rhodecode.lib import enc_utils from rhodecode.lib.ext_json import json from rhodecode.lib.caching_query import FromCache -from rhodecode.lib.encrypt import AESCipher, validate_and_get_enc_data -from rhodecode.lib.encrypt2 import Encryptor from rhodecode.lib.exceptions import ( ArtifactMetadataDuplicate, ArtifactMetadataBadValueType) from rhodecode.model.meta import Base, Session @@ -81,7 +81,7 @@ log = logging.getLogger(__name__) # this is propagated from .ini file rhodecode.encrypted_values.secret or # beaker.session.secret if first is not set. # and initialized at environment.py -ENCRYPTION_KEY = None +ENCRYPTION_KEY = '' # used to sort permissions by types, '#' used here is not allowed to be in # usernames, and it's very early in sorted string.printable table. @@ -184,12 +184,7 @@ class EncryptedTextValue(TypeDecorator): 'ie. not starting with enc$ or enc2$') algo = rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes' - if algo == 'aes': - return 'enc$aes_hmac$%s' % AESCipher(ENCRYPTION_KEY, hmac=True).encrypt(value) - elif algo == 'fernet': - return Encryptor(ENCRYPTION_KEY).encrypt(value) - else: - ValueError('Bad encryption algorithm, should be fernet or aes, got: {}'.format(algo)) + return enc_utils.encrypt_value(value, enc_key=ENCRYPTION_KEY, algo=algo) def process_result_value(self, value, dialect): """ @@ -200,15 +195,9 @@ class EncryptedTextValue(TypeDecorator): if not value: return value - algo = rhodecode.CONFIG.get('rhodecode.encrypted_values.algorithm') or 'aes' enc_strict_mode = str2bool(rhodecode.CONFIG.get('rhodecode.encrypted_values.strict') or True) - if algo == 'aes': - decrypted_data = validate_and_get_enc_data(value, ENCRYPTION_KEY, enc_strict_mode) - elif algo == 'fernet': - return Encryptor(ENCRYPTION_KEY).decrypt(value) - else: - ValueError('Bad encryption algorithm, should be fernet or aes, got: {}'.format(algo)) - return decrypted_data + + return enc_utils.decrypt_value(value, enc_key=ENCRYPTION_KEY, strict_mode=enc_strict_mode) class BaseModel(object): @@ -308,13 +297,13 @@ class BaseModel(object): attr_name, value, exist_in_session) def __repr__(self): - if hasattr(self, '__unicode__'): + if hasattr(self, '__str__'): # python repr needs to return str try: - return safe_str(self.__unicode__()) + return self.__str__() except UnicodeDecodeError: pass - return '' % (self.__class__.__name__) + return f'' class RhodeCodeSetting(Base, BaseModel): @@ -358,7 +347,7 @@ class RhodeCodeSetting(Base, BaseModel): # decode the encrypted value if 'encrypted' in self.app_settings_type: cipher = EncryptedTextValue() - v = safe_unicode(cipher.process_result_value(v, None)) + v = safe_str(cipher.process_result_value(v, None)) converter = self.SETTINGS_TYPES.get(_type) or \ self.SETTINGS_TYPES['unicode'] @@ -375,7 +364,7 @@ class RhodeCodeSetting(Base, BaseModel): # encode the encrypted value if 'encrypted' in self.app_settings_type: cipher = EncryptedTextValue() - val = safe_unicode(cipher.process_bind_param(val, None)) + val = safe_str(cipher.process_bind_param(val, None)) self._app_settings_value = val @hybrid_property @@ -395,8 +384,8 @@ class RhodeCodeSetting(Base, BaseModel): .filter(RhodeCodeSetting.app_settings_name.startswith(prefix))\ .all() - def __unicode__(self): - return u"<%s('%s:%s[%s]')>" % ( + def __str__(self): + return "<%s('%s:%s[%s]')>" % ( self.__class__.__name__, self.app_settings_name, self.app_settings_value, self.app_settings_type