##// END OF EJS Templates
feat(archive-cache): added extra info on number of evicted caches
feat(archive-cache): added extra info on number of evicted caches

File last commit:

r5381:0a39631e default
r5434:6da054fb default
Show More
encrypt2.py
101 lines | 3.4 KiB | text/x-python | PythonLexer
encryption: added new backend using cryptography + Fernet encryption....
r3522 import os
import base64
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 from rhodecode.lib.str_utils import safe_str
from rhodecode.lib.exceptions import signature_verification_error
class InvalidDecryptedValue(str):
def __new__(cls, content):
"""
This will generate something like this::
<InvalidDecryptedValue(QkWusFgLJXR6m42v...)>
And represent a safe indicator that encryption key is broken
"""
modernize: updates for python3
r5095 content = f'<{cls.__name__}({content[:16]}...)>'
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 return str.__new__(cls, content)
encryption: added new backend using cryptography + Fernet encryption....
r3522
class Encryptor(object):
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 key_format = b'enc2$salt:{1}$data:{2}'
fix(encryptor): use a failsafe mechanism of detecting old algo for encryption to NOT crash the app when switching to fernet
r5363
encryption: added new backend using cryptography + Fernet encryption....
r3522 pref_len = 5 # salt:, data:
fix(encryptor): use a failsafe mechanism of detecting old algo for encryption to NOT crash the app when switching to fernet
r5363 @classmethod
def detect_enc_algo(cls, enc_data: bytes):
parts = enc_data.split(b'$', 3)
if b'enc$aes_hmac$' in enc_data:
fix(encryption): don't be strict on enc format when no enc headers are missing....
r5381 # we expect this data is encrypted, so validate the header
if len(parts) != 3:
raise ValueError(f'Encrypted Data has invalid format, expected {cls.key_format}, got `{parts}`')
fix(encryptor): use a failsafe mechanism of detecting old algo for encryption to NOT crash the app when switching to fernet
r5363 return 'aes'
elif b'enc2$salt' in enc_data:
fix(encryption): don't be strict on enc format when no enc headers are missing....
r5381 # we expect this data is encrypted, so validate the header
if len(parts) != 3:
raise ValueError(f'Encrypted Data has invalid format, expected {cls.key_format}, got `{parts}`')
fix(encryptor): use a failsafe mechanism of detecting old algo for encryption to NOT crash the app when switching to fernet
r5363 return 'fernet'
return None
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 def __init__(self, enc_key: bytes):
encryption: added new backend using cryptography + Fernet encryption....
r3522 self.enc_key = enc_key
def b64_encode(self, data):
return base64.urlsafe_b64encode(data)
def b64_decode(self, data):
return base64.urlsafe_b64decode(data)
def get_encryptor(self, salt):
"""
Uses Fernet as encryptor with HMAC signature
:param salt: random salt used for encrypting the data
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
key = self.b64_encode(kdf.derive(self.enc_key))
return Fernet(key)
def _get_parts(self, enc_data):
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 parts = enc_data.split(b'$', 3)
encryption: added new backend using cryptography + Fernet encryption....
r3522 if len(parts) != 3:
fix(encryption): don't be strict on enc format when no enc headers are missing....
r5381 raise ValueError(f'Encrypted Data has invalid format, expected {self.key_format}, got `{parts}`')
encryption: added new backend using cryptography + Fernet encryption....
r3522 prefix, salt, enc_data = parts
try:
salt = self.b64_decode(salt[self.pref_len:])
except TypeError:
# bad base64
raise ValueError('Encrypted Data salt invalid format, expected base64 format')
enc_data = enc_data[self.pref_len:]
return prefix, salt, enc_data
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 def encrypt(self, data) -> bytes:
encryption: added new backend using cryptography + Fernet encryption....
r3522 salt = os.urandom(64)
encryptor = self.get_encryptor(salt)
enc_data = encryptor.encrypt(data)
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 return self.key_format.replace(b'{1}', self.b64_encode(salt)).replace(b'{2}', enc_data)
encryption: added new backend using cryptography + Fernet encryption....
r3522
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 def decrypt(self, data, safe=True) -> bytes | InvalidDecryptedValue:
encryption: added new backend using cryptography + Fernet encryption....
r3522 parts = self._get_parts(data)
salt = parts[1]
enc_data = parts[2]
encryptor = self.get_encryptor(salt)
try:
return encryptor.decrypt(enc_data)
except (InvalidToken,):
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 decrypt_fail = InvalidDecryptedValue(safe_str(data))
encryption: added new backend using cryptography + Fernet encryption....
r3522 if safe:
encryption: unified and rewrote encryption modules to be consistent no matter what algo is used....
r4995 return decrypt_fail
raise signature_verification_error(decrypt_fail)