encrypt2.py
101 lines
| 3.4 KiB
| text/x-python
|
PythonLexer
r3522 | import os | |||
import base64 | ||||
from cryptography.fernet import Fernet, InvalidToken | ||||
from cryptography.hazmat.backends import default_backend | ||||
from cryptography.hazmat.primitives import hashes | ||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | ||||
r4995 | from rhodecode.lib.str_utils import safe_str | |||
from rhodecode.lib.exceptions import signature_verification_error | ||||
class InvalidDecryptedValue(str): | ||||
def __new__(cls, content): | ||||
""" | ||||
This will generate something like this:: | ||||
<InvalidDecryptedValue(QkWusFgLJXR6m42v...)> | ||||
And represent a safe indicator that encryption key is broken | ||||
""" | ||||
r5095 | content = f'<{cls.__name__}({content[:16]}...)>' | |||
r4995 | return str.__new__(cls, content) | |||
r3522 | ||||
class Encryptor(object): | ||||
r4995 | key_format = b'enc2$salt:{1}$data:{2}' | |||
r5363 | ||||
r3522 | pref_len = 5 # salt:, data: | |||
r5363 | @classmethod | |||
def detect_enc_algo(cls, enc_data: bytes): | ||||
parts = enc_data.split(b'$', 3) | ||||
if b'enc$aes_hmac$' in enc_data: | ||||
r5381 | # we expect this data is encrypted, so validate the header | |||
if len(parts) != 3: | ||||
raise ValueError(f'Encrypted Data has invalid format, expected {cls.key_format}, got `{parts}`') | ||||
r5363 | return 'aes' | |||
elif b'enc2$salt' in enc_data: | ||||
r5381 | # we expect this data is encrypted, so validate the header | |||
if len(parts) != 3: | ||||
raise ValueError(f'Encrypted Data has invalid format, expected {cls.key_format}, got `{parts}`') | ||||
r5363 | return 'fernet' | |||
return None | ||||
r4995 | def __init__(self, enc_key: bytes): | |||
r3522 | self.enc_key = enc_key | |||
def b64_encode(self, data): | ||||
return base64.urlsafe_b64encode(data) | ||||
def b64_decode(self, data): | ||||
return base64.urlsafe_b64decode(data) | ||||
def get_encryptor(self, salt): | ||||
""" | ||||
Uses Fernet as encryptor with HMAC signature | ||||
:param salt: random salt used for encrypting the data | ||||
""" | ||||
kdf = PBKDF2HMAC( | ||||
algorithm=hashes.SHA512(), | ||||
length=32, | ||||
salt=salt, | ||||
iterations=100000, | ||||
backend=default_backend() | ||||
) | ||||
key = self.b64_encode(kdf.derive(self.enc_key)) | ||||
return Fernet(key) | ||||
def _get_parts(self, enc_data): | ||||
r4995 | parts = enc_data.split(b'$', 3) | |||
r3522 | if len(parts) != 3: | |||
r5381 | raise ValueError(f'Encrypted Data has invalid format, expected {self.key_format}, got `{parts}`') | |||
r3522 | prefix, salt, enc_data = parts | |||
try: | ||||
salt = self.b64_decode(salt[self.pref_len:]) | ||||
except TypeError: | ||||
# bad base64 | ||||
raise ValueError('Encrypted Data salt invalid format, expected base64 format') | ||||
enc_data = enc_data[self.pref_len:] | ||||
return prefix, salt, enc_data | ||||
r4995 | def encrypt(self, data) -> bytes: | |||
r3522 | salt = os.urandom(64) | |||
encryptor = self.get_encryptor(salt) | ||||
enc_data = encryptor.encrypt(data) | ||||
r4995 | return self.key_format.replace(b'{1}', self.b64_encode(salt)).replace(b'{2}', enc_data) | |||
r3522 | ||||
r4995 | def decrypt(self, data, safe=True) -> bytes | InvalidDecryptedValue: | |||
r3522 | parts = self._get_parts(data) | |||
salt = parts[1] | ||||
enc_data = parts[2] | ||||
encryptor = self.get_encryptor(salt) | ||||
try: | ||||
return encryptor.decrypt(enc_data) | ||||
except (InvalidToken,): | ||||
r4995 | decrypt_fail = InvalidDecryptedValue(safe_str(data)) | |||
r3522 | if safe: | |||
r4995 | return decrypt_fail | |||
raise signature_verification_error(decrypt_fail) | ||||